Compare commits
1 Commits
0e5195e64c
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c12dba886e |
372
scripts/cookie_manager.py
Normal file
372
scripts/cookie_manager.py
Normal file
@@ -0,0 +1,372 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""YouTube cookie manager for Shanty.
|
||||||
|
|
||||||
|
Manages a persistent Firefox profile for YouTube authentication.
|
||||||
|
Handles interactive login (via Xvfb + x11vnc + noVNC), headless cookie
|
||||||
|
refresh, and Netscape-format cookie export for yt-dlp.
|
||||||
|
|
||||||
|
Dependencies (runtime):
|
||||||
|
- firefox-esr
|
||||||
|
- xvfb, x11vnc, novnc, websockify (for interactive login only)
|
||||||
|
- sqlite3 (Python stdlib)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
cookie_manager.py login-start <profile_dir> <vnc_port>
|
||||||
|
cookie_manager.py login-stop
|
||||||
|
cookie_manager.py refresh <profile_dir> <cookies_output>
|
||||||
|
cookie_manager.py export <profile_dir> <cookies_output>
|
||||||
|
cookie_manager.py status <profile_dir>
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import signal
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def find_firefox() -> str:
|
||||||
|
"""Find the Firefox binary — 'firefox-esr' (Debian) or 'firefox' (Arch/other)."""
|
||||||
|
for name in ("firefox-esr", "firefox"):
|
||||||
|
if shutil.which(name):
|
||||||
|
return name
|
||||||
|
raise FileNotFoundError("firefox not found on PATH")
|
||||||
|
|
||||||
|
# File that tracks PIDs of login session processes
|
||||||
|
PID_FILE = "/tmp/shanty-login-pids.json"
|
||||||
|
|
||||||
|
|
||||||
|
def export_cookies(profile_dir: str, output_path: str) -> dict:
|
||||||
|
"""Read cookies.sqlite from a Firefox profile and write Netscape format."""
|
||||||
|
cookies_db = os.path.join(profile_dir, "cookies.sqlite")
|
||||||
|
if not os.path.exists(cookies_db):
|
||||||
|
return {"status": "error", "error": "cookies.sqlite not found"}
|
||||||
|
|
||||||
|
# Copy the database to avoid locking issues
|
||||||
|
tmp_db = os.path.join(tempfile.gettempdir(), "shanty_cookies_tmp.sqlite")
|
||||||
|
try:
|
||||||
|
shutil.copy2(cookies_db, tmp_db)
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": f"failed to copy cookies db: {e}"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = sqlite3.connect(tmp_db)
|
||||||
|
cursor = conn.execute(
|
||||||
|
"SELECT host, path, isSecure, expiry, name, value "
|
||||||
|
"FROM moz_cookies "
|
||||||
|
"WHERE host LIKE '%youtube%' OR host LIKE '%google%'"
|
||||||
|
)
|
||||||
|
rows = cursor.fetchall()
|
||||||
|
conn.close()
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": f"sqlite error: {e}"}
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_db)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return {"status": "error", "error": "no YouTube/Google cookies found"}
|
||||||
|
|
||||||
|
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
|
||||||
|
with open(output_path, "w") as f:
|
||||||
|
f.write("# Netscape HTTP Cookie File\n")
|
||||||
|
f.write("# Exported by Shanty cookie_manager.py\n")
|
||||||
|
for host, path, secure, expiry, name, value in rows:
|
||||||
|
flag = "TRUE" if host.startswith(".") else "FALSE"
|
||||||
|
secure_str = "TRUE" if secure else "FALSE"
|
||||||
|
f.write(f"{host}\t{flag}\t{path}\t{secure_str}\t{expiry}\t{name}\t{value}\n")
|
||||||
|
|
||||||
|
return {"status": "ok", "cookies_count": len(rows)}
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_export(args):
|
||||||
|
"""Export cookies from profile without launching Firefox."""
|
||||||
|
result = export_cookies(args.profile_dir, args.cookies_output)
|
||||||
|
json.dump(result, sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_status(args):
|
||||||
|
"""Check if profile has valid YouTube cookies."""
|
||||||
|
cookies_db = os.path.join(args.profile_dir, "cookies.sqlite")
|
||||||
|
if not os.path.exists(cookies_db):
|
||||||
|
json.dump({"authenticated": False, "reason": "no profile"}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
tmp_db = os.path.join(tempfile.gettempdir(), "shanty_cookies_status.sqlite")
|
||||||
|
shutil.copy2(cookies_db, tmp_db)
|
||||||
|
conn = sqlite3.connect(tmp_db)
|
||||||
|
cursor = conn.execute(
|
||||||
|
"SELECT MAX(lastAccessed) FROM moz_cookies "
|
||||||
|
"WHERE host LIKE '%youtube%' OR host LIKE '%google%'"
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
count = conn.execute(
|
||||||
|
"SELECT COUNT(*) FROM moz_cookies "
|
||||||
|
"WHERE host LIKE '%youtube%' OR host LIKE '%google%'"
|
||||||
|
).fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
os.unlink(tmp_db)
|
||||||
|
except Exception as e:
|
||||||
|
json.dump({"authenticated": False, "reason": str(e)}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not row[0] or count == 0:
|
||||||
|
json.dump({"authenticated": False, "reason": "no cookies"}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
# lastAccessed is in microseconds since epoch
|
||||||
|
last_accessed = row[0] / 1_000_000
|
||||||
|
age_hours = (time.time() - last_accessed) / 3600
|
||||||
|
|
||||||
|
json.dump({
|
||||||
|
"authenticated": True,
|
||||||
|
"cookie_count": count,
|
||||||
|
"cookie_age_hours": round(age_hours, 1),
|
||||||
|
}, sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_refresh(args):
|
||||||
|
"""Launch headless Firefox to refresh cookies, then export."""
|
||||||
|
profile_dir = args.profile_dir
|
||||||
|
cookies_output = args.cookies_output
|
||||||
|
|
||||||
|
if not os.path.isdir(profile_dir):
|
||||||
|
json.dump({"status": "error", "error": "profile directory not found"}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Launch Firefox headless, visit YouTube, quit
|
||||||
|
try:
|
||||||
|
firefox = find_firefox()
|
||||||
|
proc = subprocess.Popen(
|
||||||
|
[
|
||||||
|
firefox, "--headless",
|
||||||
|
"--profile", profile_dir,
|
||||||
|
"https://www.youtube.com",
|
||||||
|
],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
# Wait for page to load and cookies to refresh
|
||||||
|
time.sleep(10)
|
||||||
|
proc.terminate()
|
||||||
|
try:
|
||||||
|
proc.wait(timeout=10)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
proc.kill()
|
||||||
|
proc.wait()
|
||||||
|
except FileNotFoundError:
|
||||||
|
json.dump({"status": "error", "error": "firefox not found on PATH"}, sys.stdout)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
json.dump({"status": "error", "error": f"firefox error: {e}"}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Export cookies
|
||||||
|
result = export_cookies(profile_dir, cookies_output)
|
||||||
|
json.dump(result, sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_login_start(args):
|
||||||
|
"""Start Xvfb + x11vnc + noVNC + Firefox for interactive login."""
|
||||||
|
profile_dir = args.profile_dir
|
||||||
|
vnc_port = int(args.vnc_port)
|
||||||
|
|
||||||
|
# Pick an unused display number
|
||||||
|
import random
|
||||||
|
display_num = random.randint(50, 199)
|
||||||
|
display = f":{display_num}"
|
||||||
|
|
||||||
|
os.makedirs(profile_dir, exist_ok=True)
|
||||||
|
|
||||||
|
pids = {}
|
||||||
|
|
||||||
|
# Build a minimal env — strips Wayland vars and forces DISPLAY to Xvfb.
|
||||||
|
# This prevents x11vnc from detecting Wayland and Firefox from using
|
||||||
|
# the user's real X/Wayland session.
|
||||||
|
clean_env = {
|
||||||
|
"DISPLAY": display,
|
||||||
|
"HOME": os.environ.get("HOME", "/tmp"),
|
||||||
|
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
|
||||||
|
"XDG_RUNTIME_DIR": os.environ.get("XDG_RUNTIME_DIR", f"/run/user/{os.getuid()}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start Xvfb (virtual framebuffer)
|
||||||
|
xvfb = subprocess.Popen(
|
||||||
|
["Xvfb", display, "-screen", "0", "1280x720x24"],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
pids["xvfb"] = xvfb.pid
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Verify Xvfb started
|
||||||
|
if xvfb.poll() is not None:
|
||||||
|
stderr = xvfb.stderr.read().decode() if xvfb.stderr else ""
|
||||||
|
raise RuntimeError(f"Xvfb failed to start on {display}: {stderr}")
|
||||||
|
|
||||||
|
# Start x11vnc (with clean env to avoid Wayland detection)
|
||||||
|
vnc_display_port = 5900 + display_num
|
||||||
|
x11vnc = subprocess.Popen(
|
||||||
|
[
|
||||||
|
"x11vnc", "-display", display,
|
||||||
|
"-rfbport", str(vnc_display_port),
|
||||||
|
"-nopw", "-forever", "-shared",
|
||||||
|
],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
env=clean_env,
|
||||||
|
)
|
||||||
|
pids["x11vnc"] = x11vnc.pid
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Start websockify (noVNC proxy)
|
||||||
|
# noVNC web files location varies by distro
|
||||||
|
novnc_path = "/usr/share/novnc"
|
||||||
|
if not os.path.isdir(novnc_path):
|
||||||
|
novnc_path = "/usr/share/webapps/novnc" # Arch
|
||||||
|
websockify = subprocess.Popen(
|
||||||
|
[
|
||||||
|
"websockify", "--web", novnc_path,
|
||||||
|
str(vnc_port), f"localhost:{vnc_display_port}",
|
||||||
|
],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
)
|
||||||
|
pids["websockify"] = websockify.pid
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Start Firefox on the virtual display
|
||||||
|
firefox_bin = find_firefox()
|
||||||
|
firefox = subprocess.Popen(
|
||||||
|
[
|
||||||
|
firefox_bin,
|
||||||
|
"--profile", profile_dir,
|
||||||
|
"https://accounts.google.com/ServiceLogin?continue=https://music.youtube.com",
|
||||||
|
],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
env=clean_env,
|
||||||
|
)
|
||||||
|
pids["firefox"] = firefox.pid
|
||||||
|
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
# Clean up anything we started
|
||||||
|
for pid in pids.values():
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGTERM)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
json.dump({"status": "error", "error": f"missing dependency: {e}"}, sys.stdout)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
for pid in pids.values():
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGTERM)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
json.dump({"status": "error", "error": str(e)}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Save PIDs for cleanup
|
||||||
|
with open(PID_FILE, "w") as f:
|
||||||
|
json.dump(pids, f)
|
||||||
|
|
||||||
|
vnc_url = f"http://localhost:{vnc_port}/vnc.html?autoconnect=true"
|
||||||
|
json.dump({
|
||||||
|
"status": "running",
|
||||||
|
"vnc_url": vnc_url,
|
||||||
|
"pids": pids,
|
||||||
|
}, sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_login_stop(args):
|
||||||
|
"""Stop all login session processes and export cookies."""
|
||||||
|
if not os.path.exists(PID_FILE):
|
||||||
|
json.dump({"status": "error", "error": "no active login session"}, sys.stdout)
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(PID_FILE) as f:
|
||||||
|
pids = json.load(f)
|
||||||
|
|
||||||
|
# Kill processes in reverse order
|
||||||
|
for name in ["firefox", "websockify", "x11vnc", "xvfb"]:
|
||||||
|
pid = pids.get(name)
|
||||||
|
if pid:
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGTERM)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Wait a moment for Firefox to flush cookies
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Force kill anything still alive
|
||||||
|
for pid in pids.values():
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGKILL)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
os.unlink(PID_FILE)
|
||||||
|
|
||||||
|
# Export cookies if profile dir and output provided
|
||||||
|
if args.profile_dir and args.cookies_output:
|
||||||
|
result = export_cookies(args.profile_dir, args.cookies_output)
|
||||||
|
result["status"] = "stopped"
|
||||||
|
json.dump(result, sys.stdout)
|
||||||
|
else:
|
||||||
|
json.dump({"status": "stopped"}, sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Shanty YouTube cookie manager")
|
||||||
|
sub = parser.add_subparsers(dest="command", required=True)
|
||||||
|
|
||||||
|
# login-start
|
||||||
|
p = sub.add_parser("login-start")
|
||||||
|
p.add_argument("profile_dir")
|
||||||
|
p.add_argument("vnc_port")
|
||||||
|
|
||||||
|
# login-stop
|
||||||
|
p = sub.add_parser("login-stop")
|
||||||
|
p.add_argument("--profile-dir", dest="profile_dir", default=None)
|
||||||
|
p.add_argument("--cookies-output", dest="cookies_output", default=None)
|
||||||
|
|
||||||
|
# refresh
|
||||||
|
p = sub.add_parser("refresh")
|
||||||
|
p.add_argument("profile_dir")
|
||||||
|
p.add_argument("cookies_output")
|
||||||
|
|
||||||
|
# export
|
||||||
|
p = sub.add_parser("export")
|
||||||
|
p.add_argument("profile_dir")
|
||||||
|
p.add_argument("cookies_output")
|
||||||
|
|
||||||
|
# status
|
||||||
|
p = sub.add_parser("status")
|
||||||
|
p.add_argument("profile_dir")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
handlers = {
|
||||||
|
"login-start": cmd_login_start,
|
||||||
|
"login-stop": cmd_login_stop,
|
||||||
|
"refresh": cmd_refresh,
|
||||||
|
"export": cmd_export,
|
||||||
|
"status": cmd_status,
|
||||||
|
}
|
||||||
|
handlers[args.command](args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -69,6 +69,9 @@ impl YtDlpBackend {
|
|||||||
let mut cmd = Command::new("yt-dlp");
|
let mut cmd = Command::new("yt-dlp");
|
||||||
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||||
|
|
||||||
|
// Allow yt-dlp to fetch updated JS challenge solver scripts
|
||||||
|
cmd.args(["--remote-components", "ejs:github"]);
|
||||||
|
|
||||||
// Add cookies if configured
|
// Add cookies if configured
|
||||||
if let Some(ref cookies) = self.cookies_path {
|
if let Some(ref cookies) = self.cookies_path {
|
||||||
cmd.arg("--cookies").arg(cookies);
|
cmd.arg("--cookies").arg(cookies);
|
||||||
|
|||||||
Reference in New Issue
Block a user