diff --git a/frontend/src/pages/dashboard.rs b/frontend/src/pages/dashboard.rs
index d7ce3e8..874658c 100644
--- a/frontend/src/pages/dashboard.rs
+++ b/frontend/src/pages/dashboard.rs
@@ -365,6 +365,35 @@ pub fn dashboard() -> Html {
+ // Work Queue Progress
+ if let Some(ref wq) = s.work_queue {
+ if wq.download.pending + wq.download.running + wq.tag.pending + wq.tag.running + wq.organize.pending + wq.organize.running > 0
+ || wq.download.completed + wq.tag.completed + wq.organize.completed > 0
+ {
+
+
{ "Pipeline Progress" }
+
+
+ | { "Step" } | { "Pending" } | { "Running" } | { "Done" } | { "Failed" } |
+
+
+ { for [("Download", &wq.download), ("Index", &wq.index), ("Tag", &wq.tag), ("Organize", &wq.organize)].iter().map(|(name, c)| {
+ html! {
+
+ | { name } |
+ { c.pending } |
+ { if c.running > 0 { html! { { c.running } } } else { html! { { "0" } } } } |
+ { c.completed } |
+ { if c.failed > 0 { html! { { c.failed } } } else { html! { { "0" } } } } |
+
+ }
+ })}
+
+
+
+ }
+ }
+
// Background Tasks (always show if there are tasks or scheduled items)
if !s.tasks.is_empty() || has_scheduled {
diff --git a/frontend/src/types.rs b/frontend/src/types.rs
index f885cc9..816831d 100644
--- a/frontend/src/types.rs
+++ b/frontend/src/types.rs
@@ -225,7 +225,10 @@ pub struct TaskRef {
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct PipelineRef {
+ #[serde(default)]
pub task_ids: Vec
,
+ #[serde(default)]
+ pub pipeline_id: Option,
}
// --- Status ---
@@ -246,6 +249,10 @@ pub struct Status {
pub tasks: Vec,
#[serde(default)]
pub scheduled: Option,
+ #[serde(default)]
+ pub work_queue: Option,
+ #[serde(default)]
+ pub scheduler: Option,
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
@@ -254,6 +261,22 @@ pub struct ScheduledTasks {
pub next_monitor: Option,
}
+#[derive(Debug, Clone, PartialEq, Deserialize)]
+pub struct WorkQueueCounts {
+ pub pending: u64,
+ pub running: u64,
+ pub completed: u64,
+ pub failed: u64,
+}
+
+#[derive(Debug, Clone, PartialEq, Deserialize)]
+pub struct WorkQueueStats {
+ pub download: WorkQueueCounts,
+ pub index: WorkQueueCounts,
+ pub tag: WorkQueueCounts,
+ pub organize: WorkQueueCounts,
+}
+
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct LibrarySummary {
pub total_items: u64,
diff --git a/src/cookie_refresh.rs b/src/cookie_refresh.rs
index 9e33987..28296f2 100644
--- a/src/cookie_refresh.rs
+++ b/src/cookie_refresh.rs
@@ -1,59 +1,22 @@
-//! Background task that periodically refreshes YouTube cookies via headless Firefox.
+//! YouTube cookie refresh via headless Firefox.
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::process::Stdio;
-use std::sync::Arc;
-use std::time::Duration;
use tokio::process::Command;
-use tokio::sync::RwLock;
-use crate::config::AppConfig;
+/// Run a headless cookie refresh. Returns success message or error.
+pub async fn run_refresh() -> Result {
+ let profile_dir = shanty_config::data_dir().join("firefox-profile");
+ let cookies_path = shanty_config::data_dir().join("cookies.txt");
-/// Spawn the cookie refresh background loop.
-///
-/// This task runs forever, sleeping for `cookie_refresh_hours` between refreshes.
-/// It reads the current config on each iteration so changes take effect without restart.
-pub fn spawn(config: Arc>) {
- tokio::spawn(async move {
- loop {
- let (enabled, hours) = {
- let cfg = config.read().await;
- (
- cfg.download.cookie_refresh_enabled,
- cfg.download.cookie_refresh_hours.max(1),
- )
- };
+ if !profile_dir.exists() {
+ return Err(format!(
+ "no Firefox profile at {}",
+ profile_dir.display()
+ ));
+ }
- // Sleep for the configured interval
- tokio::time::sleep(Duration::from_secs(u64::from(hours) * 3600)).await;
-
- if !enabled {
- continue;
- }
-
- let profile_dir = shanty_config::data_dir().join("firefox-profile");
- let cookies_path = shanty_config::data_dir().join("cookies.txt");
-
- if !profile_dir.exists() {
- tracing::warn!(
- "cookie refresh skipped: no Firefox profile at {}",
- profile_dir.display()
- );
- continue;
- }
-
- tracing::info!("starting cookie refresh");
-
- match run_refresh(&profile_dir, &cookies_path).await {
- Ok(msg) => tracing::info!("cookie refresh complete: {msg}"),
- Err(e) => tracing::error!("cookie refresh failed: {e}"),
- }
- }
- });
-}
-
-async fn run_refresh(profile_dir: &Path, cookies_path: &Path) -> Result {
let script = find_script()?;
let output = Command::new("python3")
diff --git a/src/lib.rs b/src/lib.rs
index 147d674..e823edb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,7 +11,8 @@ pub mod error;
pub mod mb_update;
pub mod monitor;
pub mod pipeline;
-pub mod pipeline_scheduler;
pub mod routes;
+pub mod scheduler;
pub mod state;
pub mod tasks;
+pub mod workers;
diff --git a/src/main.rs b/src/main.rs
index 55fbff4..47e884c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,6 +14,7 @@ use shanty_web::config::AppConfig;
use shanty_web::routes;
use shanty_web::state::AppState;
use shanty_web::tasks::TaskManager;
+use shanty_web::workers::WorkerManager;
#[derive(Parser)]
#[command(name = "shanty-web", about = "Shanty web interface backend")]
@@ -94,21 +95,13 @@ async fn main() -> anyhow::Result<()> {
config: std::sync::Arc::new(tokio::sync::RwLock::new(config)),
config_path,
tasks: TaskManager::new(),
+ workers: WorkerManager::new(),
firefox_login: tokio::sync::Mutex::new(None),
- scheduler: tokio::sync::Mutex::new(shanty_web::state::SchedulerInfo {
- next_pipeline: None,
- next_monitor: None,
- skip_pipeline: false,
- skip_monitor: false,
- }),
});
- // Start background cookie refresh task
- shanty_web::cookie_refresh::spawn(state.config.clone());
-
- // Start pipeline and monitor schedulers
- shanty_web::pipeline_scheduler::spawn(state.clone());
- shanty_web::monitor::spawn(state.clone());
+ // Start work queue workers and unified scheduler
+ WorkerManager::spawn_all(state.clone());
+ shanty_web::scheduler::spawn(state.clone());
shanty_web::mb_update::spawn(state.clone());
// Resolve static files directory relative to the binary location
diff --git a/src/monitor.rs b/src/monitor.rs
index 3bfe59e..a67600a 100644
--- a/src/monitor.rs
+++ b/src/monitor.rs
@@ -2,7 +2,6 @@
//! and automatically adds them to the watchlist.
use std::collections::HashSet;
-use std::time::Duration;
use actix_web::web;
use serde::Serialize;
@@ -197,68 +196,3 @@ pub async fn check_monitored_artists(
Ok(stats)
}
-
-/// Spawn the monitor scheduler background loop.
-///
-/// Sleeps for the configured interval, then checks monitored artists if enabled.
-/// Reads config each iteration so changes take effect without restart.
-pub fn spawn(state: web::Data) {
- tokio::spawn(async move {
- loop {
- let (enabled, hours) = {
- let cfg = state.config.read().await;
- (
- cfg.scheduling.monitor_enabled,
- cfg.scheduling.monitor_interval_hours.max(1),
- )
- };
-
- let sleep_secs = u64::from(hours) * 3600;
-
- // Update next-run time
- {
- let mut sched = state.scheduler.lock().await;
- sched.next_monitor = if enabled {
- Some(
- (chrono::Utc::now() + chrono::Duration::seconds(sleep_secs as i64))
- .naive_utc(),
- )
- } else {
- None
- };
- }
-
- tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
-
- if !enabled {
- continue;
- }
-
- // Check if this run was skipped
- {
- let mut sched = state.scheduler.lock().await;
- sched.next_monitor = None;
- if sched.skip_monitor {
- sched.skip_monitor = false;
- tracing::info!("scheduled monitor check skipped (user cancelled)");
- continue;
- }
- }
-
- tracing::info!("scheduled monitor check starting");
- match check_monitored_artists(&state).await {
- Ok(stats) => {
- tracing::info!(
- artists_checked = stats.artists_checked,
- new_releases = stats.new_releases_found,
- tracks_added = stats.tracks_added,
- "scheduled monitor check complete"
- );
- }
- Err(e) => {
- tracing::error!(error = %e, "scheduled monitor check failed");
- }
- }
- }
- });
-}
diff --git a/src/pipeline.rs b/src/pipeline.rs
index bd45e62..e278382 100644
--- a/src/pipeline.rs
+++ b/src/pipeline.rs
@@ -1,218 +1,67 @@
-//! Shared pipeline logic used by both the API endpoint and the scheduler.
+//! Pipeline trigger logic. Creates work queue items that flow through the
+//! Download → Tag → Organize pipeline concurrently via typed workers.
use actix_web::web;
+
+use shanty_db::entities::download_queue::DownloadStatus;
+use shanty_db::entities::work_queue::WorkTaskType;
use shanty_db::queries;
-use crate::routes::artists::enrich_all_watched_artists;
+use crate::error::ApiError;
use crate::state::AppState;
-/// Register and spawn the full 6-step pipeline. Returns the task IDs immediately.
-///
-/// Steps: sync, download, index, tag, organize, enrich.
-pub fn spawn_pipeline(state: &web::Data) -> Vec {
- let sync_id = state.tasks.register_pending("sync");
- let download_id = state.tasks.register_pending("download");
- let index_id = state.tasks.register_pending("index");
- let tag_id = state.tasks.register_pending("tag");
- let organize_id = state.tasks.register_pending("organize");
- let enrich_id = state.tasks.register_pending("enrich");
+/// Trigger the full pipeline: sync wanted items to download queue, then create
+/// work queue items for each pending download. Returns a pipeline ID for tracking.
+pub async fn trigger_pipeline(state: &web::Data) -> Result {
+ let pipeline_id = uuid::Uuid::new_v4().to_string();
+ let conn = state.db.conn();
- let task_ids = vec![
- sync_id.clone(),
- download_id.clone(),
- index_id.clone(),
- tag_id.clone(),
- organize_id.clone(),
- enrich_id.clone(),
- ];
+ // Step 1: Sync wanted items to download queue (fast, just DB inserts)
+ let sync_stats = shanty_dl::sync_wanted_to_queue(conn, false).await?;
+ tracing::info!(
+ enqueued = sync_stats.enqueued,
+ skipped = sync_stats.skipped,
+ pipeline_id = %pipeline_id,
+ "pipeline sync complete"
+ );
- let state = state.clone();
- tokio::spawn(async move {
- run_pipeline_inner(
- &state,
- &sync_id,
- &download_id,
- &index_id,
- &tag_id,
- &organize_id,
- &enrich_id,
+ // Step 2: Create Download work items for each pending download_queue entry
+ let pending = queries::downloads::list(conn, Some(DownloadStatus::Pending)).await?;
+ for dl_item in &pending {
+ let payload = serde_json::json!({"download_queue_id": dl_item.id});
+ queries::work_queue::enqueue(
+ conn,
+ WorkTaskType::Download,
+ &payload.to_string(),
+ Some(&pipeline_id),
)
- .await;
+ .await?;
+ }
+
+ if !pending.is_empty() {
+ state.workers.notify(WorkTaskType::Download);
+ }
+
+ tracing::info!(
+ download_items = pending.len(),
+ pipeline_id = %pipeline_id,
+ "pipeline work items created"
+ );
+
+ Ok(pipeline_id)
+}
+
+/// Trigger the pipeline and return immediately (for API endpoints).
+pub fn spawn_pipeline(state: &web::Data) -> String {
+ let state = state.clone();
+ let pipeline_id = uuid::Uuid::new_v4().to_string();
+
+ tokio::spawn(async move {
+ match trigger_pipeline(&state).await {
+ Ok(id) => tracing::info!(pipeline_id = %id, "pipeline triggered"),
+ Err(e) => tracing::error!(error = %e, "pipeline trigger failed"),
+ }
});
- task_ids
-}
-
-/// Run the pipeline without registering tasks (for the scheduler, which logs instead).
-pub async fn run_pipeline(state: &web::Data) -> Vec {
- let sync_id = state.tasks.register_pending("sync");
- let download_id = state.tasks.register_pending("download");
- let index_id = state.tasks.register_pending("index");
- let tag_id = state.tasks.register_pending("tag");
- let organize_id = state.tasks.register_pending("organize");
- let enrich_id = state.tasks.register_pending("enrich");
-
- let task_ids = vec![
- sync_id.clone(),
- download_id.clone(),
- index_id.clone(),
- tag_id.clone(),
- organize_id.clone(),
- enrich_id.clone(),
- ];
-
- run_pipeline_inner(
- state,
- &sync_id,
- &download_id,
- &index_id,
- &tag_id,
- &organize_id,
- &enrich_id,
- )
- .await;
-
- task_ids
-}
-
-async fn run_pipeline_inner(
- state: &web::Data,
- sync_id: &str,
- download_id: &str,
- index_id: &str,
- tag_id: &str,
- organize_id: &str,
- enrich_id: &str,
-) {
- let cfg = state.config.read().await.clone();
-
- // Step 1: Sync
- state.tasks.start(sync_id);
- state
- .tasks
- .update_progress(sync_id, 0, 0, "Syncing watchlist to download queue...");
- match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await {
- Ok(stats) => state.tasks.complete(sync_id, format!("{stats}")),
- Err(e) => state.tasks.fail(sync_id, e.to_string()),
- }
-
- // Step 2: Download
- state.tasks.start(download_id);
- let cookies = cfg.download.cookies_path.clone();
- let format: shanty_dl::AudioFormat = cfg
- .download
- .format
- .parse()
- .unwrap_or(shanty_dl::AudioFormat::Opus);
- let source: shanty_dl::SearchSource = cfg
- .download
- .search_source
- .parse()
- .unwrap_or(shanty_dl::SearchSource::YouTubeMusic);
- let rate = if cookies.is_some() {
- cfg.download.rate_limit_auth
- } else {
- cfg.download.rate_limit
- };
- let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone());
- let backend_config = shanty_dl::BackendConfig {
- output_dir: cfg.download_path.clone(),
- format,
- cookies_path: cookies,
- };
- let task_state = state.clone();
- let progress_tid = download_id.to_string();
- let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| {
- task_state
- .tasks
- .update_progress(&progress_tid, current, total, msg);
- });
- match shanty_dl::run_queue_with_progress(
- state.db.conn(),
- &backend,
- &backend_config,
- false,
- Some(on_progress),
- )
- .await
- {
- Ok(stats) => {
- let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await;
- state.tasks.complete(download_id, format!("{stats}"));
- }
- Err(e) => state.tasks.fail(download_id, e.to_string()),
- }
-
- // Step 3: Index
- state.tasks.start(index_id);
- state
- .tasks
- .update_progress(index_id, 0, 0, "Scanning library...");
- let scan_config = shanty_index::ScanConfig {
- root: cfg.library_path.clone(),
- dry_run: false,
- concurrency: cfg.indexing.concurrency,
- };
- match shanty_index::run_scan(state.db.conn(), &scan_config).await {
- Ok(stats) => state.tasks.complete(index_id, format!("{stats}")),
- Err(e) => state.tasks.fail(index_id, e.to_string()),
- }
-
- // Step 4: Tag
- state.tasks.start(tag_id);
- state
- .tasks
- .update_progress(tag_id, 0, 0, "Tagging tracks...");
- match shanty_tag::MusicBrainzClient::new() {
- Ok(mb) => {
- let tag_config = shanty_tag::TagConfig {
- dry_run: false,
- write_tags: cfg.tagging.write_tags,
- confidence: cfg.tagging.confidence,
- };
- match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await {
- Ok(stats) => state.tasks.complete(tag_id, format!("{stats}")),
- Err(e) => state.tasks.fail(tag_id, e.to_string()),
- }
- }
- Err(e) => state.tasks.fail(tag_id, e.to_string()),
- }
-
- // Step 5: Organize
- state.tasks.start(organize_id);
- state
- .tasks
- .update_progress(organize_id, 0, 0, "Organizing files...");
- let org_config = shanty_org::OrgConfig {
- target_dir: cfg.library_path.clone(),
- format: cfg.organization_format.clone(),
- dry_run: false,
- copy: false,
- };
- match shanty_org::organize_from_db(state.db.conn(), &org_config).await {
- Ok(stats) => {
- let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn())
- .await
- .unwrap_or(0);
- let msg = if promoted > 0 {
- format!("{stats} — {promoted} items marked as owned")
- } else {
- format!("{stats}")
- };
- state.tasks.complete(organize_id, msg);
- }
- Err(e) => state.tasks.fail(organize_id, e.to_string()),
- }
-
- // Step 6: Enrich
- state.tasks.start(enrich_id);
- state
- .tasks
- .update_progress(enrich_id, 0, 0, "Refreshing artist data...");
- match enrich_all_watched_artists(state).await {
- Ok(count) => state
- .tasks
- .complete(enrich_id, format!("{count} artists refreshed")),
- Err(e) => state.tasks.fail(enrich_id, e.to_string()),
- }
+ pipeline_id
}
diff --git a/src/pipeline_scheduler.rs b/src/pipeline_scheduler.rs
deleted file mode 100644
index fb5b6b0..0000000
--- a/src/pipeline_scheduler.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-//! Background task that runs the full pipeline on a configurable interval.
-
-use std::time::Duration;
-
-use actix_web::web;
-use chrono::Utc;
-
-use crate::state::AppState;
-
-/// Spawn the pipeline scheduler background loop.
-///
-/// Sleeps for the configured interval, then runs the full pipeline if enabled.
-/// Reads config each iteration so changes take effect without restart.
-pub fn spawn(state: web::Data) {
- tokio::spawn(async move {
- loop {
- let (enabled, hours) = {
- let cfg = state.config.read().await;
- (
- cfg.scheduling.pipeline_enabled,
- cfg.scheduling.pipeline_interval_hours.max(1),
- )
- };
-
- let sleep_secs = u64::from(hours) * 3600;
-
- // Update next-run time
- {
- let mut sched = state.scheduler.lock().await;
- sched.next_pipeline = if enabled {
- Some((Utc::now() + chrono::Duration::seconds(sleep_secs as i64)).naive_utc())
- } else {
- None
- };
- }
-
- tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
-
- if !enabled {
- continue;
- }
-
- // Check if this run was skipped
- {
- let mut sched = state.scheduler.lock().await;
- sched.next_pipeline = None;
- if sched.skip_pipeline {
- sched.skip_pipeline = false;
- tracing::info!("scheduled pipeline skipped (user cancelled)");
- continue;
- }
- }
-
- tracing::info!("scheduled pipeline starting");
- let task_ids = crate::pipeline::run_pipeline(&state).await;
- tracing::info!(?task_ids, "scheduled pipeline complete");
- }
- });
-}
diff --git a/src/routes/system.rs b/src/routes/system.rs
index 70eb4a6..5e951f4 100644
--- a/src/routes/system.rs
+++ b/src/routes/system.rs
@@ -3,12 +3,12 @@ use actix_web::{HttpResponse, web};
use serde::Deserialize;
use shanty_db::entities::download_queue::DownloadStatus;
+use shanty_db::entities::work_queue::WorkTaskType;
use shanty_db::queries;
use crate::auth;
use crate::config::AppConfig;
use crate::error::ApiError;
-use crate::routes::artists::enrich_all_watched_artists;
use crate::state::AppState;
pub fn configure(cfg: &mut web::ServiceConfig) {
@@ -38,13 +38,13 @@ async fn get_status(
session: Session,
) -> Result {
auth::require_auth(&session)?;
- let summary = shanty_watch::library_summary(state.db.conn()).await?;
- let pending_items =
- queries::downloads::list(state.db.conn(), Some(DownloadStatus::Pending)).await?;
+ let conn = state.db.conn();
+
+ let summary = shanty_watch::library_summary(conn).await?;
+ let pending_items = queries::downloads::list(conn, Some(DownloadStatus::Pending)).await?;
let downloading_items =
- queries::downloads::list(state.db.conn(), Some(DownloadStatus::Downloading)).await?;
- let failed_items =
- queries::downloads::list(state.db.conn(), Some(DownloadStatus::Failed)).await?;
+ queries::downloads::list(conn, Some(DownloadStatus::Downloading)).await?;
+ let failed_items = queries::downloads::list(conn, Some(DownloadStatus::Failed)).await?;
let tasks = state.tasks.list();
let mut queue_items = Vec::new();
@@ -52,15 +52,38 @@ async fn get_status(
queue_items.extend(pending_items.iter().cloned());
queue_items.extend(failed_items.iter().take(5).cloned());
- let needs_tagging = queries::tracks::get_needing_metadata(state.db.conn()).await?;
+ let needs_tagging = queries::tracks::get_needing_metadata(conn).await?;
- // Scheduled task info
- let sched = state.scheduler.lock().await;
- let scheduled_tasks = serde_json::json!({
- "next_pipeline": sched.next_pipeline,
- "next_monitor": sched.next_monitor,
- });
- drop(sched);
+ // Work queue counts
+ let work_queue = queries::work_queue::counts_all(conn).await.ok();
+
+ // Scheduler state from DB
+ let scheduler_jobs = queries::scheduler_state::list_all(conn).await.unwrap_or_default();
+ let scheduler_json: serde_json::Value = scheduler_jobs
+ .iter()
+ .map(|j| {
+ (
+ j.job_name.clone(),
+ serde_json::json!({
+ "last_run": j.last_run_at,
+ "next_run": j.next_run_at,
+ "last_result": j.last_result,
+ "enabled": j.enabled,
+ }),
+ )
+ })
+ .collect::>()
+ .into();
+
+ // Backward-compatible scheduled field (from scheduler_state DB)
+ let next_pipeline = scheduler_jobs
+ .iter()
+ .find(|j| j.job_name == "pipeline")
+ .and_then(|j| j.next_run_at);
+ let next_monitor = scheduler_jobs
+ .iter()
+ .find(|j| j.job_name == "monitor")
+ .and_then(|j| j.next_run_at);
Ok(HttpResponse::Ok().json(serde_json::json!({
"library": summary,
@@ -75,7 +98,12 @@ async fn get_status(
"items": needs_tagging.iter().take(20).collect::>(),
},
"tasks": tasks,
- "scheduled": scheduled_tasks,
+ "scheduled": {
+ "next_pipeline": next_pipeline,
+ "next_monitor": next_monitor,
+ },
+ "work_queue": work_queue,
+ "scheduler": scheduler_json,
})))
}
@@ -84,27 +112,16 @@ async fn trigger_index(
session: Session,
) -> Result {
auth::require_auth(&session)?;
- let task_id = state.tasks.register("index");
- let state = state.clone();
- let tid = task_id.clone();
-
- tokio::spawn(async move {
- let cfg = state.config.read().await.clone();
- state
- .tasks
- .update_progress(&tid, 0, 0, "Scanning library...");
- let scan_config = shanty_index::ScanConfig {
- root: cfg.library_path.clone(),
- dry_run: false,
- concurrency: cfg.indexing.concurrency,
- };
- match shanty_index::run_scan(state.db.conn(), &scan_config).await {
- Ok(stats) => state.tasks.complete(&tid, format!("{stats}")),
- Err(e) => state.tasks.fail(&tid, e.to_string()),
- }
- });
-
- Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
+ let payload = serde_json::json!({"scan_all": true});
+ let item = queries::work_queue::enqueue(
+ state.db.conn(),
+ WorkTaskType::Index,
+ &payload.to_string(),
+ None,
+ )
+ .await?;
+ state.workers.notify(WorkTaskType::Index);
+ Ok(HttpResponse::Accepted().json(serde_json::json!({ "work_item_id": item.id })))
}
async fn trigger_tag(
@@ -112,35 +129,18 @@ async fn trigger_tag(
session: Session,
) -> Result {
auth::require_auth(&session)?;
- let task_id = state.tasks.register("tag");
- let state = state.clone();
- let tid = task_id.clone();
-
- tokio::spawn(async move {
- let cfg = state.config.read().await.clone();
- state
- .tasks
- .update_progress(&tid, 0, 0, "Preparing tagger...");
- let mb = match shanty_tag::MusicBrainzClient::new() {
- Ok(c) => c,
- Err(e) => {
- state.tasks.fail(&tid, e.to_string());
- return;
- }
- };
- let tag_config = shanty_tag::TagConfig {
- dry_run: false,
- write_tags: cfg.tagging.write_tags,
- confidence: cfg.tagging.confidence,
- };
- state.tasks.update_progress(&tid, 0, 0, "Tagging tracks...");
- match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await {
- Ok(stats) => state.tasks.complete(&tid, format!("{stats}")),
- Err(e) => state.tasks.fail(&tid, e.to_string()),
- }
- });
-
- Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
+ let conn = state.db.conn();
+ let untagged = queries::tracks::get_needing_metadata(conn).await?;
+ let mut count = 0;
+ for track in &untagged {
+ let payload = serde_json::json!({"track_id": track.id});
+ queries::work_queue::enqueue(conn, WorkTaskType::Tag, &payload.to_string(), None).await?;
+ count += 1;
+ }
+ if count > 0 {
+ state.workers.notify(WorkTaskType::Tag);
+ }
+ Ok(HttpResponse::Accepted().json(serde_json::json!({ "enqueued": count })))
}
async fn trigger_organize(
@@ -148,39 +148,31 @@ async fn trigger_organize(
session: Session,
) -> Result {
auth::require_auth(&session)?;
- let task_id = state.tasks.register("organize");
- let state = state.clone();
- let tid = task_id.clone();
-
- tokio::spawn(async move {
- let cfg = state.config.read().await.clone();
- state
- .tasks
- .update_progress(&tid, 0, 0, "Organizing files...");
- let org_config = shanty_org::OrgConfig {
- target_dir: cfg.library_path.clone(),
- format: cfg.organization_format.clone(),
- dry_run: false,
- copy: false,
- };
- match shanty_org::organize_from_db(state.db.conn(), &org_config).await {
- Ok(stats) => {
- let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn())
- .await
- .unwrap_or(0);
- let msg = if promoted > 0 {
- format!("{stats} — {promoted} items marked as owned")
- } else {
- format!("{stats}")
- };
- state.tasks.complete(&tid, msg);
- let _ = enrich_all_watched_artists(&state).await;
- }
- Err(e) => state.tasks.fail(&tid, e.to_string()),
+ let conn = state.db.conn();
+ let mut count = 0u64;
+ let mut offset = 0u64;
+ loop {
+ let tracks = queries::tracks::list(conn, 500, offset).await?;
+ if tracks.is_empty() {
+ break;
}
- });
-
- Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
+ for track in &tracks {
+ let payload = serde_json::json!({"track_id": track.id});
+ queries::work_queue::enqueue(
+ conn,
+ WorkTaskType::Organize,
+ &payload.to_string(),
+ None,
+ )
+ .await?;
+ count += 1;
+ }
+ offset += 500;
+ }
+ if count > 0 {
+ state.workers.notify(WorkTaskType::Organize);
+ }
+ Ok(HttpResponse::Accepted().json(serde_json::json!({ "enqueued": count })))
}
async fn trigger_pipeline(
@@ -188,8 +180,8 @@ async fn trigger_pipeline(
session: Session,
) -> Result {
auth::require_auth(&session)?;
- let task_ids = crate::pipeline::spawn_pipeline(&state);
- Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids })))
+ let pipeline_id = crate::pipeline::trigger_pipeline(&state).await?;
+ Ok(HttpResponse::Accepted().json(serde_json::json!({ "pipeline_id": pipeline_id })))
}
async fn get_task(
@@ -313,10 +305,13 @@ async fn skip_pipeline(
session: Session,
) -> Result {
auth::require_admin(&session)?;
- let mut sched = state.scheduler.lock().await;
- sched.skip_pipeline = true;
- sched.next_pipeline = None;
- Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped"})))
+ // Push next_run_at forward by one interval
+ let cfg = state.config.read().await;
+ let hours = cfg.scheduling.pipeline_interval_hours.max(1);
+ drop(cfg);
+ let next = chrono::Utc::now().naive_utc() + chrono::Duration::hours(i64::from(hours));
+ queries::scheduler_state::update_next_run(state.db.conn(), "pipeline", Some(next)).await?;
+ Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped", "next_run": next})))
}
async fn skip_monitor(
@@ -324,10 +319,12 @@ async fn skip_monitor(
session: Session,
) -> Result {
auth::require_admin(&session)?;
- let mut sched = state.scheduler.lock().await;
- sched.skip_monitor = true;
- sched.next_monitor = None;
- Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped"})))
+ let cfg = state.config.read().await;
+ let hours = cfg.scheduling.monitor_interval_hours.max(1);
+ drop(cfg);
+ let next = chrono::Utc::now().naive_utc() + chrono::Duration::hours(i64::from(hours));
+ queries::scheduler_state::update_next_run(state.db.conn(), "monitor", Some(next)).await?;
+ Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped", "next_run": next})))
}
async fn get_mb_status(
@@ -389,7 +386,7 @@ async fn trigger_mb_import(
state.tasks.update_progress(
&tid,
i as u64,
- 4 + 4, // 4 downloads + 4 imports
+ 4 + 4,
&format!("Downloading {filename}..."),
);
if let Err(e) =
@@ -413,7 +410,6 @@ async fn trigger_mb_import(
let tid_clone = tid.clone();
let state_clone = state.clone();
- // Run import in blocking task since rusqlite is sync
let result = tokio::task::spawn_blocking(move || {
shanty_data::mb_import::run_import_at_path(&db_path, &data_dir, |msg| {
tracing::info!("{msg}");
diff --git a/src/scheduler.rs b/src/scheduler.rs
new file mode 100644
index 0000000..138af59
--- /dev/null
+++ b/src/scheduler.rs
@@ -0,0 +1,148 @@
+//! Unified scheduler that manages all recurring background jobs.
+//!
+//! Replaces the separate pipeline_scheduler, monitor, and cookie_refresh spawn loops
+//! with a single loop backed by persistent state in the `scheduler_state` DB table.
+
+use std::time::Duration;
+
+use actix_web::web;
+use chrono::Utc;
+
+use shanty_db::queries;
+
+use crate::state::AppState;
+
+/// Spawn the unified scheduler background loop.
+pub fn spawn(state: web::Data) {
+ tokio::spawn(async move {
+ // Initialize scheduler state rows in DB
+ for job_name in ["pipeline", "monitor", "cookie_refresh"] {
+ if let Err(e) =
+ queries::scheduler_state::get_or_create(state.db.conn(), job_name).await
+ {
+ tracing::error!(job = job_name, error = %e, "failed to init scheduler state");
+ }
+ }
+
+ loop {
+ // Check each job
+ check_and_run_job(&state, "pipeline", run_pipeline_job).await;
+ check_and_run_job(&state, "monitor", run_monitor_job).await;
+ check_and_run_job(&state, "cookie_refresh", run_cookie_refresh_job).await;
+
+ // Poll every 30 seconds
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ });
+}
+
+async fn check_and_run_job(state: &web::Data, job_name: &str, run_fn: F)
+where
+ F: FnOnce(web::Data) -> Fut,
+ Fut: std::future::Future