#!/usr/bin/env python3 """YouTube cookie manager for Shanty. Manages a persistent Firefox profile for YouTube authentication. Handles interactive login (via Xvfb + x11vnc + noVNC), headless cookie refresh, and Netscape-format cookie export for yt-dlp. Dependencies (runtime): - firefox-esr - xvfb, x11vnc, novnc, websockify (for interactive login only) - sqlite3 (Python stdlib) Usage: cookie_manager.py login-start cookie_manager.py login-stop cookie_manager.py refresh cookie_manager.py export cookie_manager.py status """ import argparse import json import os import shutil import signal import sqlite3 import subprocess import sys import tempfile import time from pathlib import Path def find_firefox() -> str: """Find the Firefox binary — 'firefox-esr' (Debian) or 'firefox' (Arch/other).""" for name in ("firefox-esr", "firefox"): if shutil.which(name): return name raise FileNotFoundError("firefox not found on PATH") # File that tracks PIDs of login session processes PID_FILE = "/tmp/shanty-login-pids.json" def export_cookies(profile_dir: str, output_path: str) -> dict: """Read cookies.sqlite from a Firefox profile and write Netscape format.""" cookies_db = os.path.join(profile_dir, "cookies.sqlite") if not os.path.exists(cookies_db): return {"status": "error", "error": "cookies.sqlite not found"} # Copy the database to avoid locking issues tmp_db = os.path.join(tempfile.gettempdir(), "shanty_cookies_tmp.sqlite") try: shutil.copy2(cookies_db, tmp_db) except Exception as e: return {"status": "error", "error": f"failed to copy cookies db: {e}"} try: conn = sqlite3.connect(tmp_db) cursor = conn.execute( "SELECT host, path, isSecure, expiry, name, value " "FROM moz_cookies " "WHERE host LIKE '%youtube%' OR host LIKE '%google%'" ) rows = cursor.fetchall() conn.close() except Exception as e: return {"status": "error", "error": f"sqlite error: {e}"} finally: try: os.unlink(tmp_db) except OSError: pass if not rows: return {"status": "error", "error": "no YouTube/Google cookies found"} os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) with open(output_path, "w") as f: f.write("# Netscape HTTP Cookie File\n") f.write("# Exported by Shanty cookie_manager.py\n") for host, path, secure, expiry, name, value in rows: flag = "TRUE" if host.startswith(".") else "FALSE" secure_str = "TRUE" if secure else "FALSE" f.write(f"{host}\t{flag}\t{path}\t{secure_str}\t{expiry}\t{name}\t{value}\n") return {"status": "ok", "cookies_count": len(rows)} def cmd_export(args): """Export cookies from profile without launching Firefox.""" result = export_cookies(args.profile_dir, args.cookies_output) json.dump(result, sys.stdout) def cmd_status(args): """Check if profile has valid YouTube cookies.""" cookies_db = os.path.join(args.profile_dir, "cookies.sqlite") if not os.path.exists(cookies_db): json.dump({"authenticated": False, "reason": "no profile"}, sys.stdout) return try: tmp_db = os.path.join(tempfile.gettempdir(), "shanty_cookies_status.sqlite") shutil.copy2(cookies_db, tmp_db) conn = sqlite3.connect(tmp_db) cursor = conn.execute( "SELECT MAX(lastAccessed) FROM moz_cookies " "WHERE host LIKE '%youtube%' OR host LIKE '%google%'" ) row = cursor.fetchone() count = conn.execute( "SELECT COUNT(*) FROM moz_cookies " "WHERE host LIKE '%youtube%' OR host LIKE '%google%'" ).fetchone()[0] conn.close() os.unlink(tmp_db) except Exception as e: json.dump({"authenticated": False, "reason": str(e)}, sys.stdout) return if not row[0] or count == 0: json.dump({"authenticated": False, "reason": "no cookies"}, sys.stdout) return # lastAccessed is in microseconds since epoch last_accessed = row[0] / 1_000_000 age_hours = (time.time() - last_accessed) / 3600 json.dump({ "authenticated": True, "cookie_count": count, "cookie_age_hours": round(age_hours, 1), }, sys.stdout) def cmd_refresh(args): """Launch headless Firefox to refresh cookies, then export.""" profile_dir = args.profile_dir cookies_output = args.cookies_output if not os.path.isdir(profile_dir): json.dump({"status": "error", "error": "profile directory not found"}, sys.stdout) return # Launch Firefox headless, visit YouTube, quit try: firefox = find_firefox() proc = subprocess.Popen( [ firefox, "--headless", "--profile", profile_dir, "https://www.youtube.com", ], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) # Wait for page to load and cookies to refresh time.sleep(10) proc.terminate() try: proc.wait(timeout=10) except subprocess.TimeoutExpired: proc.kill() proc.wait() except FileNotFoundError: json.dump({"status": "error", "error": "firefox not found on PATH"}, sys.stdout) return except Exception as e: json.dump({"status": "error", "error": f"firefox error: {e}"}, sys.stdout) return # Export cookies result = export_cookies(profile_dir, cookies_output) json.dump(result, sys.stdout) def cmd_login_start(args): """Start Xvfb + x11vnc + noVNC + Firefox for interactive login.""" profile_dir = args.profile_dir vnc_port = int(args.vnc_port) # Pick an unused display number import random display_num = random.randint(50, 199) display = f":{display_num}" os.makedirs(profile_dir, exist_ok=True) pids = {} # Build a minimal env — strips Wayland vars and forces DISPLAY to Xvfb. # This prevents x11vnc from detecting Wayland and Firefox from using # the user's real X/Wayland session. clean_env = { "DISPLAY": display, "HOME": os.environ.get("HOME", "/tmp"), "PATH": os.environ.get("PATH", "/usr/bin:/bin"), "XDG_RUNTIME_DIR": os.environ.get("XDG_RUNTIME_DIR", f"/run/user/{os.getuid()}"), } try: # Start Xvfb (virtual framebuffer) xvfb = subprocess.Popen( ["Xvfb", display, "-screen", "0", "1280x720x24"], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) pids["xvfb"] = xvfb.pid time.sleep(1) # Verify Xvfb started if xvfb.poll() is not None: stderr = xvfb.stderr.read().decode() if xvfb.stderr else "" raise RuntimeError(f"Xvfb failed to start on {display}: {stderr}") # Start x11vnc (with clean env to avoid Wayland detection) vnc_display_port = 5900 + display_num x11vnc = subprocess.Popen( [ "x11vnc", "-display", display, "-rfbport", str(vnc_display_port), "-nopw", "-forever", "-shared", ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=clean_env, ) pids["x11vnc"] = x11vnc.pid time.sleep(1) # Start websockify (noVNC proxy) # noVNC web files location varies by distro novnc_path = "/usr/share/novnc" if not os.path.isdir(novnc_path): novnc_path = "/usr/share/webapps/novnc" # Arch websockify = subprocess.Popen( [ "websockify", "--web", novnc_path, str(vnc_port), f"localhost:{vnc_display_port}", ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) pids["websockify"] = websockify.pid time.sleep(1) # Start Firefox on the virtual display firefox_bin = find_firefox() firefox = subprocess.Popen( [ firefox_bin, "--profile", profile_dir, "https://accounts.google.com/ServiceLogin?continue=https://music.youtube.com", ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=clean_env, ) pids["firefox"] = firefox.pid except FileNotFoundError as e: # Clean up anything we started for pid in pids.values(): try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass json.dump({"status": "error", "error": f"missing dependency: {e}"}, sys.stdout) return except Exception as e: for pid in pids.values(): try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass json.dump({"status": "error", "error": str(e)}, sys.stdout) return # Save PIDs for cleanup with open(PID_FILE, "w") as f: json.dump(pids, f) vnc_url = f"http://localhost:{vnc_port}/vnc.html?autoconnect=true" json.dump({ "status": "running", "vnc_url": vnc_url, "pids": pids, }, sys.stdout) def cmd_login_stop(args): """Stop all login session processes and export cookies.""" if not os.path.exists(PID_FILE): json.dump({"status": "error", "error": "no active login session"}, sys.stdout) return with open(PID_FILE) as f: pids = json.load(f) # Kill processes in reverse order for name in ["firefox", "websockify", "x11vnc", "xvfb"]: pid = pids.get(name) if pid: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass # Wait a moment for Firefox to flush cookies time.sleep(2) # Force kill anything still alive for pid in pids.values(): try: os.kill(pid, signal.SIGKILL) except ProcessLookupError: pass os.unlink(PID_FILE) # Export cookies if profile dir and output provided if args.profile_dir and args.cookies_output: result = export_cookies(args.profile_dir, args.cookies_output) result["status"] = "stopped" json.dump(result, sys.stdout) else: json.dump({"status": "stopped"}, sys.stdout) def main(): parser = argparse.ArgumentParser(description="Shanty YouTube cookie manager") sub = parser.add_subparsers(dest="command", required=True) # login-start p = sub.add_parser("login-start") p.add_argument("profile_dir") p.add_argument("vnc_port") # login-stop p = sub.add_parser("login-stop") p.add_argument("--profile-dir", dest="profile_dir", default=None) p.add_argument("--cookies-output", dest="cookies_output", default=None) # refresh p = sub.add_parser("refresh") p.add_argument("profile_dir") p.add_argument("cookies_output") # export p = sub.add_parser("export") p.add_argument("profile_dir") p.add_argument("cookies_output") # status p = sub.add_parser("status") p.add_argument("profile_dir") args = parser.parse_args() handlers = { "login-start": cmd_login_start, "login-stop": cmd_login_stop, "refresh": cmd_refresh, "export": cmd_export, "status": cmd_status, } handlers[args.command](args) if __name__ == "__main__": main()