From 36345b12ee8e862a8b86dd935af48778bc3fc989 Mon Sep 17 00:00:00 2001 From: Connor Johnstone Date: Mon, 23 Mar 2026 18:37:46 -0400 Subject: [PATCH] redux of the worker queue --- frontend/src/pages/dashboard.rs | 29 +++ frontend/src/types.rs | 23 ++ src/cookie_refresh.rs | 61 +---- src/lib.rs | 3 +- src/main.rs | 17 +- src/monitor.rs | 66 ----- src/pipeline.rs | 261 ++++--------------- src/pipeline_scheduler.rs | 59 ----- src/routes/system.rs | 216 ++++++++-------- src/scheduler.rs | 148 +++++++++++ src/state.rs | 15 +- src/workers.rs | 431 ++++++++++++++++++++++++++++++++ 12 files changed, 813 insertions(+), 516 deletions(-) delete mode 100644 src/pipeline_scheduler.rs create mode 100644 src/scheduler.rs create mode 100644 src/workers.rs diff --git a/frontend/src/pages/dashboard.rs b/frontend/src/pages/dashboard.rs index d7ce3e8..874658c 100644 --- a/frontend/src/pages/dashboard.rs +++ b/frontend/src/pages/dashboard.rs @@ -365,6 +365,35 @@ pub fn dashboard() -> Html { + // Work Queue Progress + if let Some(ref wq) = s.work_queue { + if wq.download.pending + wq.download.running + wq.tag.pending + wq.tag.running + wq.organize.pending + wq.organize.running > 0 + || wq.download.completed + wq.tag.completed + wq.organize.completed > 0 + { +
+

{ "Pipeline Progress" }

+ + + + + + { for [("Download", &wq.download), ("Index", &wq.index), ("Tag", &wq.tag), ("Organize", &wq.organize)].iter().map(|(name, c)| { + html! { + + + + + + + + } + })} + +
{ "Step" }{ "Pending" }{ "Running" }{ "Done" }{ "Failed" }
{ name }{ c.pending }{ if c.running > 0 { html! { { c.running } } } else { html! { { "0" } } } }{ c.completed }{ if c.failed > 0 { html! { { c.failed } } } else { html! { { "0" } } } }
+
+ } + } + // Background Tasks (always show if there are tasks or scheduled items) if !s.tasks.is_empty() || has_scheduled {
diff --git a/frontend/src/types.rs b/frontend/src/types.rs index f885cc9..816831d 100644 --- a/frontend/src/types.rs +++ b/frontend/src/types.rs @@ -225,7 +225,10 @@ pub struct TaskRef { #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct PipelineRef { + #[serde(default)] pub task_ids: Vec, + #[serde(default)] + pub pipeline_id: Option, } // --- Status --- @@ -246,6 +249,10 @@ pub struct Status { pub tasks: Vec, #[serde(default)] pub scheduled: Option, + #[serde(default)] + pub work_queue: Option, + #[serde(default)] + pub scheduler: Option, } #[derive(Debug, Clone, PartialEq, Deserialize)] @@ -254,6 +261,22 @@ pub struct ScheduledTasks { pub next_monitor: Option, } +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct WorkQueueCounts { + pub pending: u64, + pub running: u64, + pub completed: u64, + pub failed: u64, +} + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct WorkQueueStats { + pub download: WorkQueueCounts, + pub index: WorkQueueCounts, + pub tag: WorkQueueCounts, + pub organize: WorkQueueCounts, +} + #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct LibrarySummary { pub total_items: u64, diff --git a/src/cookie_refresh.rs b/src/cookie_refresh.rs index 9e33987..28296f2 100644 --- a/src/cookie_refresh.rs +++ b/src/cookie_refresh.rs @@ -1,59 +1,22 @@ -//! Background task that periodically refreshes YouTube cookies via headless Firefox. +//! YouTube cookie refresh via headless Firefox. -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::process::Stdio; -use std::sync::Arc; -use std::time::Duration; use tokio::process::Command; -use tokio::sync::RwLock; -use crate::config::AppConfig; +/// Run a headless cookie refresh. Returns success message or error. +pub async fn run_refresh() -> Result { + let profile_dir = shanty_config::data_dir().join("firefox-profile"); + let cookies_path = shanty_config::data_dir().join("cookies.txt"); -/// Spawn the cookie refresh background loop. -/// -/// This task runs forever, sleeping for `cookie_refresh_hours` between refreshes. -/// It reads the current config on each iteration so changes take effect without restart. -pub fn spawn(config: Arc>) { - tokio::spawn(async move { - loop { - let (enabled, hours) = { - let cfg = config.read().await; - ( - cfg.download.cookie_refresh_enabled, - cfg.download.cookie_refresh_hours.max(1), - ) - }; + if !profile_dir.exists() { + return Err(format!( + "no Firefox profile at {}", + profile_dir.display() + )); + } - // Sleep for the configured interval - tokio::time::sleep(Duration::from_secs(u64::from(hours) * 3600)).await; - - if !enabled { - continue; - } - - let profile_dir = shanty_config::data_dir().join("firefox-profile"); - let cookies_path = shanty_config::data_dir().join("cookies.txt"); - - if !profile_dir.exists() { - tracing::warn!( - "cookie refresh skipped: no Firefox profile at {}", - profile_dir.display() - ); - continue; - } - - tracing::info!("starting cookie refresh"); - - match run_refresh(&profile_dir, &cookies_path).await { - Ok(msg) => tracing::info!("cookie refresh complete: {msg}"), - Err(e) => tracing::error!("cookie refresh failed: {e}"), - } - } - }); -} - -async fn run_refresh(profile_dir: &Path, cookies_path: &Path) -> Result { let script = find_script()?; let output = Command::new("python3") diff --git a/src/lib.rs b/src/lib.rs index 147d674..e823edb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,8 @@ pub mod error; pub mod mb_update; pub mod monitor; pub mod pipeline; -pub mod pipeline_scheduler; pub mod routes; +pub mod scheduler; pub mod state; pub mod tasks; +pub mod workers; diff --git a/src/main.rs b/src/main.rs index 55fbff4..47e884c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use shanty_web::config::AppConfig; use shanty_web::routes; use shanty_web::state::AppState; use shanty_web::tasks::TaskManager; +use shanty_web::workers::WorkerManager; #[derive(Parser)] #[command(name = "shanty-web", about = "Shanty web interface backend")] @@ -94,21 +95,13 @@ async fn main() -> anyhow::Result<()> { config: std::sync::Arc::new(tokio::sync::RwLock::new(config)), config_path, tasks: TaskManager::new(), + workers: WorkerManager::new(), firefox_login: tokio::sync::Mutex::new(None), - scheduler: tokio::sync::Mutex::new(shanty_web::state::SchedulerInfo { - next_pipeline: None, - next_monitor: None, - skip_pipeline: false, - skip_monitor: false, - }), }); - // Start background cookie refresh task - shanty_web::cookie_refresh::spawn(state.config.clone()); - - // Start pipeline and monitor schedulers - shanty_web::pipeline_scheduler::spawn(state.clone()); - shanty_web::monitor::spawn(state.clone()); + // Start work queue workers and unified scheduler + WorkerManager::spawn_all(state.clone()); + shanty_web::scheduler::spawn(state.clone()); shanty_web::mb_update::spawn(state.clone()); // Resolve static files directory relative to the binary location diff --git a/src/monitor.rs b/src/monitor.rs index 3bfe59e..a67600a 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -2,7 +2,6 @@ //! and automatically adds them to the watchlist. use std::collections::HashSet; -use std::time::Duration; use actix_web::web; use serde::Serialize; @@ -197,68 +196,3 @@ pub async fn check_monitored_artists( Ok(stats) } - -/// Spawn the monitor scheduler background loop. -/// -/// Sleeps for the configured interval, then checks monitored artists if enabled. -/// Reads config each iteration so changes take effect without restart. -pub fn spawn(state: web::Data) { - tokio::spawn(async move { - loop { - let (enabled, hours) = { - let cfg = state.config.read().await; - ( - cfg.scheduling.monitor_enabled, - cfg.scheduling.monitor_interval_hours.max(1), - ) - }; - - let sleep_secs = u64::from(hours) * 3600; - - // Update next-run time - { - let mut sched = state.scheduler.lock().await; - sched.next_monitor = if enabled { - Some( - (chrono::Utc::now() + chrono::Duration::seconds(sleep_secs as i64)) - .naive_utc(), - ) - } else { - None - }; - } - - tokio::time::sleep(Duration::from_secs(sleep_secs)).await; - - if !enabled { - continue; - } - - // Check if this run was skipped - { - let mut sched = state.scheduler.lock().await; - sched.next_monitor = None; - if sched.skip_monitor { - sched.skip_monitor = false; - tracing::info!("scheduled monitor check skipped (user cancelled)"); - continue; - } - } - - tracing::info!("scheduled monitor check starting"); - match check_monitored_artists(&state).await { - Ok(stats) => { - tracing::info!( - artists_checked = stats.artists_checked, - new_releases = stats.new_releases_found, - tracks_added = stats.tracks_added, - "scheduled monitor check complete" - ); - } - Err(e) => { - tracing::error!(error = %e, "scheduled monitor check failed"); - } - } - } - }); -} diff --git a/src/pipeline.rs b/src/pipeline.rs index bd45e62..e278382 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,218 +1,67 @@ -//! Shared pipeline logic used by both the API endpoint and the scheduler. +//! Pipeline trigger logic. Creates work queue items that flow through the +//! Download → Tag → Organize pipeline concurrently via typed workers. use actix_web::web; + +use shanty_db::entities::download_queue::DownloadStatus; +use shanty_db::entities::work_queue::WorkTaskType; use shanty_db::queries; -use crate::routes::artists::enrich_all_watched_artists; +use crate::error::ApiError; use crate::state::AppState; -/// Register and spawn the full 6-step pipeline. Returns the task IDs immediately. -/// -/// Steps: sync, download, index, tag, organize, enrich. -pub fn spawn_pipeline(state: &web::Data) -> Vec { - let sync_id = state.tasks.register_pending("sync"); - let download_id = state.tasks.register_pending("download"); - let index_id = state.tasks.register_pending("index"); - let tag_id = state.tasks.register_pending("tag"); - let organize_id = state.tasks.register_pending("organize"); - let enrich_id = state.tasks.register_pending("enrich"); +/// Trigger the full pipeline: sync wanted items to download queue, then create +/// work queue items for each pending download. Returns a pipeline ID for tracking. +pub async fn trigger_pipeline(state: &web::Data) -> Result { + let pipeline_id = uuid::Uuid::new_v4().to_string(); + let conn = state.db.conn(); - let task_ids = vec![ - sync_id.clone(), - download_id.clone(), - index_id.clone(), - tag_id.clone(), - organize_id.clone(), - enrich_id.clone(), - ]; + // Step 1: Sync wanted items to download queue (fast, just DB inserts) + let sync_stats = shanty_dl::sync_wanted_to_queue(conn, false).await?; + tracing::info!( + enqueued = sync_stats.enqueued, + skipped = sync_stats.skipped, + pipeline_id = %pipeline_id, + "pipeline sync complete" + ); - let state = state.clone(); - tokio::spawn(async move { - run_pipeline_inner( - &state, - &sync_id, - &download_id, - &index_id, - &tag_id, - &organize_id, - &enrich_id, + // Step 2: Create Download work items for each pending download_queue entry + let pending = queries::downloads::list(conn, Some(DownloadStatus::Pending)).await?; + for dl_item in &pending { + let payload = serde_json::json!({"download_queue_id": dl_item.id}); + queries::work_queue::enqueue( + conn, + WorkTaskType::Download, + &payload.to_string(), + Some(&pipeline_id), ) - .await; + .await?; + } + + if !pending.is_empty() { + state.workers.notify(WorkTaskType::Download); + } + + tracing::info!( + download_items = pending.len(), + pipeline_id = %pipeline_id, + "pipeline work items created" + ); + + Ok(pipeline_id) +} + +/// Trigger the pipeline and return immediately (for API endpoints). +pub fn spawn_pipeline(state: &web::Data) -> String { + let state = state.clone(); + let pipeline_id = uuid::Uuid::new_v4().to_string(); + + tokio::spawn(async move { + match trigger_pipeline(&state).await { + Ok(id) => tracing::info!(pipeline_id = %id, "pipeline triggered"), + Err(e) => tracing::error!(error = %e, "pipeline trigger failed"), + } }); - task_ids -} - -/// Run the pipeline without registering tasks (for the scheduler, which logs instead). -pub async fn run_pipeline(state: &web::Data) -> Vec { - let sync_id = state.tasks.register_pending("sync"); - let download_id = state.tasks.register_pending("download"); - let index_id = state.tasks.register_pending("index"); - let tag_id = state.tasks.register_pending("tag"); - let organize_id = state.tasks.register_pending("organize"); - let enrich_id = state.tasks.register_pending("enrich"); - - let task_ids = vec![ - sync_id.clone(), - download_id.clone(), - index_id.clone(), - tag_id.clone(), - organize_id.clone(), - enrich_id.clone(), - ]; - - run_pipeline_inner( - state, - &sync_id, - &download_id, - &index_id, - &tag_id, - &organize_id, - &enrich_id, - ) - .await; - - task_ids -} - -async fn run_pipeline_inner( - state: &web::Data, - sync_id: &str, - download_id: &str, - index_id: &str, - tag_id: &str, - organize_id: &str, - enrich_id: &str, -) { - let cfg = state.config.read().await.clone(); - - // Step 1: Sync - state.tasks.start(sync_id); - state - .tasks - .update_progress(sync_id, 0, 0, "Syncing watchlist to download queue..."); - match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await { - Ok(stats) => state.tasks.complete(sync_id, format!("{stats}")), - Err(e) => state.tasks.fail(sync_id, e.to_string()), - } - - // Step 2: Download - state.tasks.start(download_id); - let cookies = cfg.download.cookies_path.clone(); - let format: shanty_dl::AudioFormat = cfg - .download - .format - .parse() - .unwrap_or(shanty_dl::AudioFormat::Opus); - let source: shanty_dl::SearchSource = cfg - .download - .search_source - .parse() - .unwrap_or(shanty_dl::SearchSource::YouTubeMusic); - let rate = if cookies.is_some() { - cfg.download.rate_limit_auth - } else { - cfg.download.rate_limit - }; - let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone()); - let backend_config = shanty_dl::BackendConfig { - output_dir: cfg.download_path.clone(), - format, - cookies_path: cookies, - }; - let task_state = state.clone(); - let progress_tid = download_id.to_string(); - let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| { - task_state - .tasks - .update_progress(&progress_tid, current, total, msg); - }); - match shanty_dl::run_queue_with_progress( - state.db.conn(), - &backend, - &backend_config, - false, - Some(on_progress), - ) - .await - { - Ok(stats) => { - let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; - state.tasks.complete(download_id, format!("{stats}")); - } - Err(e) => state.tasks.fail(download_id, e.to_string()), - } - - // Step 3: Index - state.tasks.start(index_id); - state - .tasks - .update_progress(index_id, 0, 0, "Scanning library..."); - let scan_config = shanty_index::ScanConfig { - root: cfg.library_path.clone(), - dry_run: false, - concurrency: cfg.indexing.concurrency, - }; - match shanty_index::run_scan(state.db.conn(), &scan_config).await { - Ok(stats) => state.tasks.complete(index_id, format!("{stats}")), - Err(e) => state.tasks.fail(index_id, e.to_string()), - } - - // Step 4: Tag - state.tasks.start(tag_id); - state - .tasks - .update_progress(tag_id, 0, 0, "Tagging tracks..."); - match shanty_tag::MusicBrainzClient::new() { - Ok(mb) => { - let tag_config = shanty_tag::TagConfig { - dry_run: false, - write_tags: cfg.tagging.write_tags, - confidence: cfg.tagging.confidence, - }; - match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { - Ok(stats) => state.tasks.complete(tag_id, format!("{stats}")), - Err(e) => state.tasks.fail(tag_id, e.to_string()), - } - } - Err(e) => state.tasks.fail(tag_id, e.to_string()), - } - - // Step 5: Organize - state.tasks.start(organize_id); - state - .tasks - .update_progress(organize_id, 0, 0, "Organizing files..."); - let org_config = shanty_org::OrgConfig { - target_dir: cfg.library_path.clone(), - format: cfg.organization_format.clone(), - dry_run: false, - copy: false, - }; - match shanty_org::organize_from_db(state.db.conn(), &org_config).await { - Ok(stats) => { - let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) - .await - .unwrap_or(0); - let msg = if promoted > 0 { - format!("{stats} — {promoted} items marked as owned") - } else { - format!("{stats}") - }; - state.tasks.complete(organize_id, msg); - } - Err(e) => state.tasks.fail(organize_id, e.to_string()), - } - - // Step 6: Enrich - state.tasks.start(enrich_id); - state - .tasks - .update_progress(enrich_id, 0, 0, "Refreshing artist data..."); - match enrich_all_watched_artists(state).await { - Ok(count) => state - .tasks - .complete(enrich_id, format!("{count} artists refreshed")), - Err(e) => state.tasks.fail(enrich_id, e.to_string()), - } + pipeline_id } diff --git a/src/pipeline_scheduler.rs b/src/pipeline_scheduler.rs deleted file mode 100644 index fb5b6b0..0000000 --- a/src/pipeline_scheduler.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Background task that runs the full pipeline on a configurable interval. - -use std::time::Duration; - -use actix_web::web; -use chrono::Utc; - -use crate::state::AppState; - -/// Spawn the pipeline scheduler background loop. -/// -/// Sleeps for the configured interval, then runs the full pipeline if enabled. -/// Reads config each iteration so changes take effect without restart. -pub fn spawn(state: web::Data) { - tokio::spawn(async move { - loop { - let (enabled, hours) = { - let cfg = state.config.read().await; - ( - cfg.scheduling.pipeline_enabled, - cfg.scheduling.pipeline_interval_hours.max(1), - ) - }; - - let sleep_secs = u64::from(hours) * 3600; - - // Update next-run time - { - let mut sched = state.scheduler.lock().await; - sched.next_pipeline = if enabled { - Some((Utc::now() + chrono::Duration::seconds(sleep_secs as i64)).naive_utc()) - } else { - None - }; - } - - tokio::time::sleep(Duration::from_secs(sleep_secs)).await; - - if !enabled { - continue; - } - - // Check if this run was skipped - { - let mut sched = state.scheduler.lock().await; - sched.next_pipeline = None; - if sched.skip_pipeline { - sched.skip_pipeline = false; - tracing::info!("scheduled pipeline skipped (user cancelled)"); - continue; - } - } - - tracing::info!("scheduled pipeline starting"); - let task_ids = crate::pipeline::run_pipeline(&state).await; - tracing::info!(?task_ids, "scheduled pipeline complete"); - } - }); -} diff --git a/src/routes/system.rs b/src/routes/system.rs index 70eb4a6..5e951f4 100644 --- a/src/routes/system.rs +++ b/src/routes/system.rs @@ -3,12 +3,12 @@ use actix_web::{HttpResponse, web}; use serde::Deserialize; use shanty_db::entities::download_queue::DownloadStatus; +use shanty_db::entities::work_queue::WorkTaskType; use shanty_db::queries; use crate::auth; use crate::config::AppConfig; use crate::error::ApiError; -use crate::routes::artists::enrich_all_watched_artists; use crate::state::AppState; pub fn configure(cfg: &mut web::ServiceConfig) { @@ -38,13 +38,13 @@ async fn get_status( session: Session, ) -> Result { auth::require_auth(&session)?; - let summary = shanty_watch::library_summary(state.db.conn()).await?; - let pending_items = - queries::downloads::list(state.db.conn(), Some(DownloadStatus::Pending)).await?; + let conn = state.db.conn(); + + let summary = shanty_watch::library_summary(conn).await?; + let pending_items = queries::downloads::list(conn, Some(DownloadStatus::Pending)).await?; let downloading_items = - queries::downloads::list(state.db.conn(), Some(DownloadStatus::Downloading)).await?; - let failed_items = - queries::downloads::list(state.db.conn(), Some(DownloadStatus::Failed)).await?; + queries::downloads::list(conn, Some(DownloadStatus::Downloading)).await?; + let failed_items = queries::downloads::list(conn, Some(DownloadStatus::Failed)).await?; let tasks = state.tasks.list(); let mut queue_items = Vec::new(); @@ -52,15 +52,38 @@ async fn get_status( queue_items.extend(pending_items.iter().cloned()); queue_items.extend(failed_items.iter().take(5).cloned()); - let needs_tagging = queries::tracks::get_needing_metadata(state.db.conn()).await?; + let needs_tagging = queries::tracks::get_needing_metadata(conn).await?; - // Scheduled task info - let sched = state.scheduler.lock().await; - let scheduled_tasks = serde_json::json!({ - "next_pipeline": sched.next_pipeline, - "next_monitor": sched.next_monitor, - }); - drop(sched); + // Work queue counts + let work_queue = queries::work_queue::counts_all(conn).await.ok(); + + // Scheduler state from DB + let scheduler_jobs = queries::scheduler_state::list_all(conn).await.unwrap_or_default(); + let scheduler_json: serde_json::Value = scheduler_jobs + .iter() + .map(|j| { + ( + j.job_name.clone(), + serde_json::json!({ + "last_run": j.last_run_at, + "next_run": j.next_run_at, + "last_result": j.last_result, + "enabled": j.enabled, + }), + ) + }) + .collect::>() + .into(); + + // Backward-compatible scheduled field (from scheduler_state DB) + let next_pipeline = scheduler_jobs + .iter() + .find(|j| j.job_name == "pipeline") + .and_then(|j| j.next_run_at); + let next_monitor = scheduler_jobs + .iter() + .find(|j| j.job_name == "monitor") + .and_then(|j| j.next_run_at); Ok(HttpResponse::Ok().json(serde_json::json!({ "library": summary, @@ -75,7 +98,12 @@ async fn get_status( "items": needs_tagging.iter().take(20).collect::>(), }, "tasks": tasks, - "scheduled": scheduled_tasks, + "scheduled": { + "next_pipeline": next_pipeline, + "next_monitor": next_monitor, + }, + "work_queue": work_queue, + "scheduler": scheduler_json, }))) } @@ -84,27 +112,16 @@ async fn trigger_index( session: Session, ) -> Result { auth::require_auth(&session)?; - let task_id = state.tasks.register("index"); - let state = state.clone(); - let tid = task_id.clone(); - - tokio::spawn(async move { - let cfg = state.config.read().await.clone(); - state - .tasks - .update_progress(&tid, 0, 0, "Scanning library..."); - let scan_config = shanty_index::ScanConfig { - root: cfg.library_path.clone(), - dry_run: false, - concurrency: cfg.indexing.concurrency, - }; - match shanty_index::run_scan(state.db.conn(), &scan_config).await { - Ok(stats) => state.tasks.complete(&tid, format!("{stats}")), - Err(e) => state.tasks.fail(&tid, e.to_string()), - } - }); - - Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) + let payload = serde_json::json!({"scan_all": true}); + let item = queries::work_queue::enqueue( + state.db.conn(), + WorkTaskType::Index, + &payload.to_string(), + None, + ) + .await?; + state.workers.notify(WorkTaskType::Index); + Ok(HttpResponse::Accepted().json(serde_json::json!({ "work_item_id": item.id }))) } async fn trigger_tag( @@ -112,35 +129,18 @@ async fn trigger_tag( session: Session, ) -> Result { auth::require_auth(&session)?; - let task_id = state.tasks.register("tag"); - let state = state.clone(); - let tid = task_id.clone(); - - tokio::spawn(async move { - let cfg = state.config.read().await.clone(); - state - .tasks - .update_progress(&tid, 0, 0, "Preparing tagger..."); - let mb = match shanty_tag::MusicBrainzClient::new() { - Ok(c) => c, - Err(e) => { - state.tasks.fail(&tid, e.to_string()); - return; - } - }; - let tag_config = shanty_tag::TagConfig { - dry_run: false, - write_tags: cfg.tagging.write_tags, - confidence: cfg.tagging.confidence, - }; - state.tasks.update_progress(&tid, 0, 0, "Tagging tracks..."); - match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { - Ok(stats) => state.tasks.complete(&tid, format!("{stats}")), - Err(e) => state.tasks.fail(&tid, e.to_string()), - } - }); - - Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) + let conn = state.db.conn(); + let untagged = queries::tracks::get_needing_metadata(conn).await?; + let mut count = 0; + for track in &untagged { + let payload = serde_json::json!({"track_id": track.id}); + queries::work_queue::enqueue(conn, WorkTaskType::Tag, &payload.to_string(), None).await?; + count += 1; + } + if count > 0 { + state.workers.notify(WorkTaskType::Tag); + } + Ok(HttpResponse::Accepted().json(serde_json::json!({ "enqueued": count }))) } async fn trigger_organize( @@ -148,39 +148,31 @@ async fn trigger_organize( session: Session, ) -> Result { auth::require_auth(&session)?; - let task_id = state.tasks.register("organize"); - let state = state.clone(); - let tid = task_id.clone(); - - tokio::spawn(async move { - let cfg = state.config.read().await.clone(); - state - .tasks - .update_progress(&tid, 0, 0, "Organizing files..."); - let org_config = shanty_org::OrgConfig { - target_dir: cfg.library_path.clone(), - format: cfg.organization_format.clone(), - dry_run: false, - copy: false, - }; - match shanty_org::organize_from_db(state.db.conn(), &org_config).await { - Ok(stats) => { - let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) - .await - .unwrap_or(0); - let msg = if promoted > 0 { - format!("{stats} — {promoted} items marked as owned") - } else { - format!("{stats}") - }; - state.tasks.complete(&tid, msg); - let _ = enrich_all_watched_artists(&state).await; - } - Err(e) => state.tasks.fail(&tid, e.to_string()), + let conn = state.db.conn(); + let mut count = 0u64; + let mut offset = 0u64; + loop { + let tracks = queries::tracks::list(conn, 500, offset).await?; + if tracks.is_empty() { + break; } - }); - - Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) + for track in &tracks { + let payload = serde_json::json!({"track_id": track.id}); + queries::work_queue::enqueue( + conn, + WorkTaskType::Organize, + &payload.to_string(), + None, + ) + .await?; + count += 1; + } + offset += 500; + } + if count > 0 { + state.workers.notify(WorkTaskType::Organize); + } + Ok(HttpResponse::Accepted().json(serde_json::json!({ "enqueued": count }))) } async fn trigger_pipeline( @@ -188,8 +180,8 @@ async fn trigger_pipeline( session: Session, ) -> Result { auth::require_auth(&session)?; - let task_ids = crate::pipeline::spawn_pipeline(&state); - Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids }))) + let pipeline_id = crate::pipeline::trigger_pipeline(&state).await?; + Ok(HttpResponse::Accepted().json(serde_json::json!({ "pipeline_id": pipeline_id }))) } async fn get_task( @@ -313,10 +305,13 @@ async fn skip_pipeline( session: Session, ) -> Result { auth::require_admin(&session)?; - let mut sched = state.scheduler.lock().await; - sched.skip_pipeline = true; - sched.next_pipeline = None; - Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped"}))) + // Push next_run_at forward by one interval + let cfg = state.config.read().await; + let hours = cfg.scheduling.pipeline_interval_hours.max(1); + drop(cfg); + let next = chrono::Utc::now().naive_utc() + chrono::Duration::hours(i64::from(hours)); + queries::scheduler_state::update_next_run(state.db.conn(), "pipeline", Some(next)).await?; + Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped", "next_run": next}))) } async fn skip_monitor( @@ -324,10 +319,12 @@ async fn skip_monitor( session: Session, ) -> Result { auth::require_admin(&session)?; - let mut sched = state.scheduler.lock().await; - sched.skip_monitor = true; - sched.next_monitor = None; - Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped"}))) + let cfg = state.config.read().await; + let hours = cfg.scheduling.monitor_interval_hours.max(1); + drop(cfg); + let next = chrono::Utc::now().naive_utc() + chrono::Duration::hours(i64::from(hours)); + queries::scheduler_state::update_next_run(state.db.conn(), "monitor", Some(next)).await?; + Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped", "next_run": next}))) } async fn get_mb_status( @@ -389,7 +386,7 @@ async fn trigger_mb_import( state.tasks.update_progress( &tid, i as u64, - 4 + 4, // 4 downloads + 4 imports + 4 + 4, &format!("Downloading {filename}..."), ); if let Err(e) = @@ -413,7 +410,6 @@ async fn trigger_mb_import( let tid_clone = tid.clone(); let state_clone = state.clone(); - // Run import in blocking task since rusqlite is sync let result = tokio::task::spawn_blocking(move || { shanty_data::mb_import::run_import_at_path(&db_path, &data_dir, |msg| { tracing::info!("{msg}"); diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..138af59 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,148 @@ +//! Unified scheduler that manages all recurring background jobs. +//! +//! Replaces the separate pipeline_scheduler, monitor, and cookie_refresh spawn loops +//! with a single loop backed by persistent state in the `scheduler_state` DB table. + +use std::time::Duration; + +use actix_web::web; +use chrono::Utc; + +use shanty_db::queries; + +use crate::state::AppState; + +/// Spawn the unified scheduler background loop. +pub fn spawn(state: web::Data) { + tokio::spawn(async move { + // Initialize scheduler state rows in DB + for job_name in ["pipeline", "monitor", "cookie_refresh"] { + if let Err(e) = + queries::scheduler_state::get_or_create(state.db.conn(), job_name).await + { + tracing::error!(job = job_name, error = %e, "failed to init scheduler state"); + } + } + + loop { + // Check each job + check_and_run_job(&state, "pipeline", run_pipeline_job).await; + check_and_run_job(&state, "monitor", run_monitor_job).await; + check_and_run_job(&state, "cookie_refresh", run_cookie_refresh_job).await; + + // Poll every 30 seconds + tokio::time::sleep(Duration::from_secs(30)).await; + } + }); +} + +async fn check_and_run_job(state: &web::Data, job_name: &str, run_fn: F) +where + F: FnOnce(web::Data) -> Fut, + Fut: std::future::Future>, +{ + let job = match queries::scheduler_state::get_or_create(state.db.conn(), job_name).await { + Ok(j) => j, + Err(e) => { + tracing::error!(job = job_name, error = %e, "failed to read scheduler state"); + return; + } + }; + + // Read config to check if enabled and get interval + let (config_enabled, interval_secs) = read_job_config(state, job_name).await; + + // If config says disabled, ensure DB state reflects it + if !config_enabled { + if job.enabled { + let _ = + queries::scheduler_state::set_enabled(state.db.conn(), job_name, false).await; + let _ = + queries::scheduler_state::update_next_run(state.db.conn(), job_name, None).await; + } + return; + } + + // If DB says disabled (e.g. user skipped), re-enable from config + if !job.enabled { + let _ = queries::scheduler_state::set_enabled(state.db.conn(), job_name, true).await; + } + + let now = Utc::now().naive_utc(); + + // If no next_run_at is set, schedule one + if job.next_run_at.is_none() { + let next = now + chrono::Duration::seconds(interval_secs); + let _ = + queries::scheduler_state::update_next_run(state.db.conn(), job_name, Some(next)).await; + return; + } + + // Check if it's time to run + let next_run = job.next_run_at.unwrap(); + if now < next_run { + return; + } + + // Time to run! + tracing::info!(job = job_name, "scheduled job starting"); + + let result = run_fn(state.clone()).await; + + let result_str = match &result { + Ok(msg) => { + tracing::info!(job = job_name, result = %msg, "scheduled job complete"); + format!("ok: {msg}") + } + Err(e) => { + tracing::error!(job = job_name, error = %e, "scheduled job failed"); + format!("error: {e}") + } + }; + + // Update last run and schedule next + let _ = queries::scheduler_state::update_last_run(state.db.conn(), job_name, &result_str).await; + let next = Utc::now().naive_utc() + chrono::Duration::seconds(interval_secs); + let _ = + queries::scheduler_state::update_next_run(state.db.conn(), job_name, Some(next)).await; +} + +async fn read_job_config(state: &web::Data, job_name: &str) -> (bool, i64) { + let cfg = state.config.read().await; + match job_name { + "pipeline" => ( + cfg.scheduling.pipeline_enabled, + i64::from(cfg.scheduling.pipeline_interval_hours.max(1)) * 3600, + ), + "monitor" => ( + cfg.scheduling.monitor_enabled, + i64::from(cfg.scheduling.monitor_interval_hours.max(1)) * 3600, + ), + "cookie_refresh" => ( + cfg.download.cookie_refresh_enabled, + i64::from(cfg.download.cookie_refresh_hours.max(1)) * 3600, + ), + _ => (false, 3600), + } +} + +async fn run_pipeline_job(state: web::Data) -> Result { + let pipeline_id = crate::pipeline::trigger_pipeline(&state) + .await + .map_err(|e| e.to_string())?; + Ok(format!("pipeline triggered: {pipeline_id}")) +} + +async fn run_monitor_job(state: web::Data) -> Result { + let stats = crate::monitor::check_monitored_artists(&state) + .await + .map_err(|e| e.to_string())?; + Ok(format!( + "{} artists checked, {} new releases, {} tracks added", + stats.artists_checked, stats.new_releases_found, stats.tracks_added + )) +} + +async fn run_cookie_refresh_job(_state: web::Data) -> Result { + crate::cookie_refresh::run_refresh().await +} diff --git a/src/state.rs b/src/state.rs index c6e97e2..5bc3f92 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,24 +8,13 @@ use shanty_search::MusicBrainzSearch; use crate::config::AppConfig; use crate::tasks::TaskManager; +use crate::workers::WorkerManager; /// Tracks an active Firefox login session for YouTube auth. pub struct FirefoxLoginSession { pub vnc_url: String, } -/// Tracks next-run times for scheduled background tasks. -pub struct SchedulerInfo { - /// When the next pipeline run is scheduled (None if disabled). - pub next_pipeline: Option, - /// When the next monitor check is scheduled (None if disabled). - pub next_monitor: Option, - /// Skip the next pipeline run (one-shot, resets after skip). - pub skip_pipeline: bool, - /// Skip the next monitor run (one-shot, resets after skip). - pub skip_monitor: bool, -} - pub struct AppState { pub db: Database, pub mb_client: HybridMusicBrainzFetcher, @@ -34,6 +23,6 @@ pub struct AppState { pub config: Arc>, pub config_path: Option, pub tasks: TaskManager, + pub workers: WorkerManager, pub firefox_login: Mutex>, - pub scheduler: Mutex, } diff --git a/src/workers.rs b/src/workers.rs new file mode 100644 index 0000000..d9d95fb --- /dev/null +++ b/src/workers.rs @@ -0,0 +1,431 @@ +//! Work queue workers that process pipeline items concurrently. +//! +//! Each task type (Download, Index, Tag, Organize) has a dedicated worker loop +//! that polls the work_queue table and processes items with bounded concurrency. + +use std::collections::HashMap; +use std::sync::Arc; + +use actix_web::web; +use sea_orm::ActiveValue::Set; +use tokio::sync::{Notify, Semaphore}; + +use shanty_db::entities::download_queue::DownloadStatus; +use shanty_db::entities::wanted_item::WantedStatus; +use shanty_db::entities::work_queue::WorkTaskType; +use shanty_db::queries; +use shanty_dl::DownloadBackend; + +use crate::state::AppState; + +/// Manages worker notification channels and spawns worker loops. +pub struct WorkerManager { + notifiers: HashMap>, +} + +impl Default for WorkerManager { + fn default() -> Self { + Self::new() + } +} + +impl WorkerManager { + pub fn new() -> Self { + let mut notifiers = HashMap::new(); + notifiers.insert(WorkTaskType::Download, Arc::new(Notify::new())); + notifiers.insert(WorkTaskType::Index, Arc::new(Notify::new())); + notifiers.insert(WorkTaskType::Tag, Arc::new(Notify::new())); + notifiers.insert(WorkTaskType::Organize, Arc::new(Notify::new())); + Self { notifiers } + } + + /// Wake the worker for a specific task type. + pub fn notify(&self, task_type: WorkTaskType) { + if let Some(n) = self.notifiers.get(&task_type) { + n.notify_one(); + } + } + + /// Spawn all worker loops and run startup recovery. + pub fn spawn_all(state: web::Data) { + let state_clone = state.clone(); + tokio::spawn(async move { + // Reset any items stuck in Running from a previous crash + match queries::work_queue::reset_stale_running(state_clone.db.conn()).await { + Ok(count) if count > 0 => { + tracing::info!(count, "reset stale running work queue items"); + } + Err(e) => tracing::error!(error = %e, "failed to reset stale work queue items"), + _ => {} + } + + // Periodic cleanup of old completed items (every 6 hours) + let cleanup_state = state_clone.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(6 * 3600)).await; + let _ = queries::work_queue::cleanup_completed(cleanup_state.db.conn(), 7).await; + } + }); + }); + + // Spawn each worker type + spawn_worker(state.clone(), WorkTaskType::Download, 1); + spawn_worker(state.clone(), WorkTaskType::Index, 4); + spawn_worker(state.clone(), WorkTaskType::Tag, 2); + spawn_worker(state.clone(), WorkTaskType::Organize, 4); + } +} + +fn spawn_worker(state: web::Data, task_type: WorkTaskType, concurrency: usize) { + let notify = state + .workers + .notifiers + .get(&task_type) + .cloned() + .unwrap_or_else(|| Arc::new(Notify::new())); + + tokio::spawn(async move { + loop { + // Wait for notification or poll timeout + tokio::select! { + _ = notify.notified() => {} + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {} + } + + // Claim pending items + let items = match queries::work_queue::claim_next( + state.db.conn(), + task_type, + concurrency as u64, + ) + .await + { + Ok(items) => items, + Err(e) => { + tracing::error!(task_type = %task_type, error = %e, "failed to claim work items"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } + }; + + if items.is_empty() { + continue; + } + + let semaphore = Arc::new(Semaphore::new(concurrency)); + let mut handles = Vec::new(); + + for item in items { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let state = state.clone(); + let item_id = item.id; + let item_task_type = item.task_type; + let pipeline_id = item.pipeline_id.clone(); + + handles.push(tokio::spawn(async move { + let _permit = permit; + + let result = match item_task_type { + WorkTaskType::Download => process_download(&state, &item).await, + WorkTaskType::Index => process_index(&state, &item).await, + WorkTaskType::Tag => process_tag(&state, &item).await, + WorkTaskType::Organize => process_organize(&state, &item).await, + }; + + match result { + Ok(downstream) => { + if let Err(e) = + queries::work_queue::complete(state.db.conn(), item_id).await + { + tracing::error!(id = item_id, error = %e, "failed to mark work item complete"); + } + // Enqueue downstream items + for (task_type, payload) in downstream { + if let Err(e) = queries::work_queue::enqueue( + state.db.conn(), + task_type, + &payload, + pipeline_id.as_deref(), + ) + .await + { + tracing::error!( + error = %e, + "failed to enqueue downstream work item" + ); + } + state.workers.notify(task_type); + } + } + Err(error) => { + tracing::warn!( + id = item_id, + task_type = %item_task_type, + error = %error, + "work item failed" + ); + if let Err(e) = + queries::work_queue::fail(state.db.conn(), item_id, &error).await + { + tracing::error!(id = item_id, error = %e, "failed to mark work item failed"); + } + } + } + })); + } + + for handle in handles { + let _ = handle.await; + } + } + }); +} + +// --- Worker implementations --- + +type WorkResult = Result, String>; + +async fn process_download( + state: &web::Data, + item: &shanty_db::entities::work_queue::Model, +) -> WorkResult { + let payload: serde_json::Value = + serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?; + let dl_queue_id = payload + .get("download_queue_id") + .and_then(|v| v.as_i64()) + .map(|v| v as i32); + + let conn = state.db.conn(); + + // Get the download queue item — either specific ID or next pending + let dl_item = if let Some(id) = dl_queue_id { + queries::downloads::list(conn, None) + .await + .map_err(|e| e.to_string())? + .into_iter() + .find(|i| i.id == id) + } else { + queries::downloads::get_next_pending(conn) + .await + .map_err(|e| e.to_string())? + }; + + let dl_item = match dl_item { + Some(item) => item, + None => return Ok(vec![]), // Nothing to download + }; + + // Mark as downloading + queries::downloads::update_status(conn, dl_item.id, DownloadStatus::Downloading, None) + .await + .map_err(|e| e.to_string())?; + + // Build download backend from config + let cfg = state.config.read().await.clone(); + let cookies = cfg.download.cookies_path.clone(); + let format: shanty_dl::AudioFormat = cfg + .download + .format + .parse() + .unwrap_or(shanty_dl::AudioFormat::Opus); + let source: shanty_dl::SearchSource = cfg + .download + .search_source + .parse() + .unwrap_or(shanty_dl::SearchSource::YouTubeMusic); + let rate = if cookies.is_some() { + cfg.download.rate_limit_auth + } else { + cfg.download.rate_limit + }; + + let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone()); + let backend_config = shanty_dl::BackendConfig { + output_dir: cfg.download_path.clone(), + format, + cookies_path: cookies, + }; + + // Determine download target + let target = if let Some(ref url) = dl_item.source_url { + shanty_dl::DownloadTarget::Url(url.clone()) + } else { + shanty_dl::DownloadTarget::Query(dl_item.query.clone()) + }; + + // Execute download + let result = backend + .download(&target, &backend_config) + .await + .map_err(|e| e.to_string())?; + + // Mark download_queue entry as completed + queries::downloads::update_status(conn, dl_item.id, DownloadStatus::Completed, None) + .await + .map_err(|e| e.to_string())?; + + // Create track record if linked to a wanted item (same logic as shanty-dl queue.rs) + let mut downstream = Vec::new(); + if let Some(wanted_item_id) = dl_item.wanted_item_id { + let wanted = queries::wanted::get_by_id(conn, wanted_item_id) + .await + .map_err(|e| e.to_string())?; + + let file_size = std::fs::metadata(&result.file_path) + .map(|m| m.len() as i64) + .unwrap_or(0); + + let now = chrono::Utc::now().naive_utc(); + let track_active = shanty_db::entities::track::ActiveModel { + file_path: Set(result.file_path.to_string_lossy().to_string()), + title: Set(Some(result.title.clone())), + artist: Set(result.artist.clone()), + file_size: Set(file_size), + musicbrainz_id: Set(wanted.musicbrainz_id.clone()), + artist_id: Set(wanted.artist_id), + added_at: Set(now), + updated_at: Set(now), + ..Default::default() + }; + let track = queries::tracks::upsert(conn, track_active) + .await + .map_err(|e| e.to_string())?; + + queries::wanted::update_status(conn, wanted_item_id, WantedStatus::Downloaded) + .await + .map_err(|e| e.to_string())?; + + // Create Tag work item for this track + let tag_payload = serde_json::json!({"track_id": track.id}); + downstream.push((WorkTaskType::Tag, tag_payload.to_string())); + } + + let _ = queries::cache::purge_prefix(conn, "artist_totals:").await; + Ok(downstream) +} + +async fn process_index( + state: &web::Data, + item: &shanty_db::entities::work_queue::Model, +) -> WorkResult { + let payload: serde_json::Value = + serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?; + let conn = state.db.conn(); + let cfg = state.config.read().await.clone(); + let mut downstream = Vec::new(); + + if payload.get("scan_all").and_then(|v| v.as_bool()).unwrap_or(false) { + // Full library scan + let scan_config = shanty_index::ScanConfig { + root: cfg.library_path.clone(), + dry_run: false, + concurrency: cfg.indexing.concurrency, + }; + shanty_index::run_scan(conn, &scan_config) + .await + .map_err(|e| e.to_string())?; + + // Create Tag work items for all untagged tracks + let untagged = queries::tracks::get_needing_metadata(conn) + .await + .map_err(|e| e.to_string())?; + for track in &untagged { + let tag_payload = serde_json::json!({"track_id": track.id}); + downstream.push((WorkTaskType::Tag, tag_payload.to_string())); + } + } else if let Some(file_path) = payload.get("file_path").and_then(|v| v.as_str()) { + // Single file index + let path = std::path::PathBuf::from(file_path); + let track_id = shanty_index::index_file(conn, &path, false) + .await + .map_err(|e| e.to_string())?; + + if let Some(id) = track_id { + let tag_payload = serde_json::json!({"track_id": id}); + downstream.push((WorkTaskType::Tag, tag_payload.to_string())); + } + } + + Ok(downstream) +} + +async fn process_tag( + state: &web::Data, + item: &shanty_db::entities::work_queue::Model, +) -> WorkResult { + let payload: serde_json::Value = + serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?; + let track_id = payload + .get("track_id") + .and_then(|v| v.as_i64()) + .ok_or("missing track_id in payload")? + as i32; + + let conn = state.db.conn(); + let cfg = state.config.read().await.clone(); + + let track = queries::tracks::get_by_id(conn, track_id) + .await + .map_err(|e| e.to_string())?; + + let tag_config = shanty_tag::TagConfig { + dry_run: false, + write_tags: cfg.tagging.write_tags, + confidence: cfg.tagging.confidence, + }; + + shanty_tag::tag_track(conn, &state.mb_client, &track, &tag_config) + .await + .map_err(|e| e.to_string())?; + + // Create Organize work item + let org_payload = serde_json::json!({"track_id": track_id}); + Ok(vec![(WorkTaskType::Organize, org_payload.to_string())]) +} + +async fn process_organize( + state: &web::Data, + item: &shanty_db::entities::work_queue::Model, +) -> WorkResult { + let payload: serde_json::Value = + serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?; + let track_id = payload + .get("track_id") + .and_then(|v| v.as_i64()) + .ok_or("missing track_id in payload")? + as i32; + + let conn = state.db.conn(); + let cfg = state.config.read().await.clone(); + + let org_config = shanty_org::OrgConfig { + target_dir: cfg.library_path.clone(), + format: cfg.organization_format.clone(), + dry_run: false, + copy: false, + }; + + shanty_org::organize_track(conn, track_id, &org_config) + .await + .map_err(|e| e.to_string())?; + + // Promote this track's wanted item from Downloaded to Owned + let _ = queries::wanted::promote_downloaded_to_owned(conn).await; + + // Check if pipeline is complete and trigger enrichment + if let Some(ref pipeline_id) = item.pipeline_id + && let Ok(true) = queries::work_queue::pipeline_is_complete(conn, pipeline_id).await + { + tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, triggering enrichment"); + let state = state.clone(); + tokio::spawn(async move { + if let Err(e) = crate::routes::artists::enrich_all_watched_artists(&state).await { + tracing::error!(error = %e, "post-pipeline enrichment failed"); + } + }); + } + + Ok(vec![]) +}