diff --git a/frontend/src/pages/dashboard.rs b/frontend/src/pages/dashboard.rs index 59ab0f6..0bebca7 100644 --- a/frontend/src/pages/dashboard.rs +++ b/frontend/src/pages/dashboard.rs @@ -34,6 +34,8 @@ pub fn dashboard() -> Html { let status = use_state(|| None::); let error = use_state(|| None::); let message = use_state(|| None::); + let pipeline_was_active = use_state(|| false); + let pipeline_complete = use_state(|| false); // Fetch status function let fetch_status = { @@ -280,6 +282,72 @@ pub fn dashboard() -> Html { }) }; + let pipeline_progress_html = if let Some(ref wq) = s.work_queue { + let active = wq.download.pending + + wq.download.running + + wq.index.pending + + wq.index.running + + wq.tag.pending + + wq.tag.running + + wq.organize.pending + + wq.organize.running + + wq.enrich.pending + + wq.enrich.running; + + // Track pipeline active→inactive transition + if active > 0 { + if !*pipeline_was_active { + pipeline_was_active.set(true); + pipeline_complete.set(false); + } + } else if *pipeline_was_active { + pipeline_was_active.set(false); + pipeline_complete.set(true); + } + + if active > 0 { + html! { +
+

{ "Pipeline Progress" }

+ + + + + + { for [("Download", &wq.download), ("Index", &wq.index), ("Tag", &wq.tag), ("Organize", &wq.organize), ("Enrich", &wq.enrich)].iter().map(|(name, c)| { + html! { + + + + + + + + } + })} + +
{ "Step" }{ "Pending" }{ "Running" }{ "Done" }{ "Failed" }
{ name }{ c.pending }{ if c.running > 0 { html! { { c.running } } } else { html! { { "0" } } } }{ c.completed }{ if c.failed > 0 { html! { { c.failed } } } else { html! { { "0" } } } }
+
+ } + } else if *pipeline_complete { + html! { +
+

{ "Pipeline run complete!" }

+
+ } + } else { + html! {} + } + } else if *pipeline_complete { + html! { +
+

{ "Pipeline run complete!" }

+
+ } + } else { + html! {} + }; + let scheduled_jobs_html = { let next_pipeline = s .scheduled @@ -410,33 +478,7 @@ pub fn dashboard() -> Html { // Work Queue Progress - if let Some(ref wq) = s.work_queue { - if wq.download.pending + wq.download.running + wq.tag.pending + wq.tag.running + wq.organize.pending + wq.organize.running > 0 - || wq.download.completed + wq.tag.completed + wq.organize.completed > 0 - { -
-

{ "Pipeline Progress" }

- - - - - - { for [("Download", &wq.download), ("Index", &wq.index), ("Tag", &wq.tag), ("Organize", &wq.organize)].iter().map(|(name, c)| { - html! { - - - - - - - - } - })} - -
{ "Step" }{ "Pending" }{ "Running" }{ "Done" }{ "Failed" }
{ name }{ c.pending }{ if c.running > 0 { html! { { c.running } } } else { html! { { "0" } } } }{ c.completed }{ if c.failed > 0 { html! { { c.failed } } } else { html! { { "0" } } } }
-
- } - } + { pipeline_progress_html } // Scheduled Jobs (always visible) { scheduled_jobs_html } diff --git a/frontend/src/pages/library.rs b/frontend/src/pages/library.rs index 178f8b5..c6c7c59 100644 --- a/frontend/src/pages/library.rs +++ b/frontend/src/pages/library.rs @@ -88,8 +88,8 @@ pub fn library_page() -> Html { { "\u{2713}" } } - - if a.total_items > 0 { + if a.enriched { + = a.total_watched && a.total_watched > 0 { "color: var(--success);" } else if a.total_owned > 0 { "color: var(--warning);" } @@ -97,20 +97,22 @@ pub fn library_page() -> Html { }> { format!("{}/{}", a.total_owned, a.total_watched) } - } - - - if a.total_items > 0 { + + 0 { "color: var(--accent);" } else { "color: var(--text-muted);" } }> { format!("{}/{}", a.total_watched, a.total_items) } - } - + + } else { + + { "Awaiting artist enrichment..." } + + } - if a.total_items > 0 { + if a.enriched && a.total_items > 0 { { a.total_items } } diff --git a/frontend/src/types.rs b/frontend/src/types.rs index 08b22c5..26df87c 100644 --- a/frontend/src/types.rs +++ b/frontend/src/types.rs @@ -30,6 +30,8 @@ pub struct ArtistListItem { pub musicbrainz_id: Option, #[serde(default)] pub monitored: bool, + #[serde(default)] + pub enriched: bool, pub total_watched: usize, pub total_owned: usize, pub total_items: usize, @@ -265,7 +267,7 @@ pub struct ScheduledTasks { pub next_monitor: Option, } -#[derive(Debug, Clone, PartialEq, Deserialize)] +#[derive(Debug, Clone, PartialEq, Default, Deserialize)] pub struct WorkQueueCounts { pub pending: u64, pub running: u64, @@ -279,6 +281,8 @@ pub struct WorkQueueStats { pub index: WorkQueueCounts, pub tag: WorkQueueCounts, pub organize: WorkQueueCounts, + #[serde(default)] + pub enrich: WorkQueueCounts, } #[derive(Debug, Clone, PartialEq, Deserialize)] diff --git a/src/pipeline.rs b/src/pipeline.rs index 0089750..3e0722f 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -16,6 +16,9 @@ pub async fn trigger_pipeline(state: &web::Data) -> Result, monitored: bool, + enriched: bool, total_watched: usize, total_owned: usize, total_items: usize, @@ -111,17 +112,12 @@ async fn list_artists( None }; + let enriched = cached_totals.is_some(); let (total_watched, total_owned, total_items) = if let Some((avail, watched, owned)) = cached_totals { (watched as usize, owned as usize, avail as usize) } else { - // Fall back to wanted item counts - let total_items = artist_wanted.len(); - let total_owned = artist_wanted - .iter() - .filter(|w| w.status == WantedStatus::Owned) - .count(); - (total_items, total_owned, total_items) + (0, 0, 0) }; items.push(ArtistListItem { @@ -129,6 +125,7 @@ async fn list_artists( name: a.name.clone(), musicbrainz_id: a.musicbrainz_id.clone(), monitored: a.monitored, + enriched, total_watched, total_owned, total_items, diff --git a/src/workers.rs b/src/workers.rs index 1da2091..781b1ec 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -36,6 +36,7 @@ impl WorkerManager { notifiers.insert(WorkTaskType::Index, Arc::new(Notify::new())); notifiers.insert(WorkTaskType::Tag, Arc::new(Notify::new())); notifiers.insert(WorkTaskType::Organize, Arc::new(Notify::new())); + notifiers.insert(WorkTaskType::Enrich, Arc::new(Notify::new())); Self { notifiers } } @@ -83,6 +84,7 @@ impl WorkerManager { cfg.tagging.concurrency, ); spawn_worker(state_clone.clone(), WorkTaskType::Organize, 4); + spawn_worker(state_clone.clone(), WorkTaskType::Enrich, 2); }); } } @@ -141,6 +143,7 @@ fn spawn_worker(state: web::Data, task_type: WorkTaskType, concurrency WorkTaskType::Index => process_index(&state, &item).await, WorkTaskType::Tag => process_tag(&state, &item).await, WorkTaskType::Organize => process_organize(&state, &item).await, + WorkTaskType::Enrich => process_enrich(&state, &item).await, }; match result { @@ -341,43 +344,14 @@ async fn process_index( .await .map_err(|e| e.to_string())?; - // Create Tag work items for tracks that still need processing: - // 1. Tracks without MBIDs (need MB search + tagging) - // 2. Tracks with MBIDs but no wanted_item yet (need wanted_item creation + organize) - let needs_processing = queries::tracks::get_needing_metadata(conn) + // Create Tag work items for tracks not yet tagged by shanty + let untagged = queries::tracks::get_untagged(conn) .await .map_err(|e| e.to_string())?; - for track in &needs_processing { + for track in &untagged { let tag_payload = serde_json::json!({"track_id": track.id}); downstream.push((WorkTaskType::Tag, tag_payload.to_string())); } - - // Also process tracks that have MBIDs (from file tags) but no wanted_item - let all_wanted = queries::wanted::list(conn, None, None) - .await - .map_err(|e| e.to_string())?; - let wanted_mbids: std::collections::HashSet<&str> = all_wanted - .iter() - .filter_map(|w| w.musicbrainz_id.as_deref()) - .collect(); - let mut offset = 0u64; - loop { - let tracks = queries::tracks::list(conn, 500, offset) - .await - .map_err(|e| e.to_string())?; - if tracks.is_empty() { - break; - } - for track in &tracks { - if let Some(ref mbid) = track.musicbrainz_id - && !wanted_mbids.contains(mbid.as_str()) - { - let tag_payload = serde_json::json!({"track_id": track.id}); - downstream.push((WorkTaskType::Tag, tag_payload.to_string())); - } - } - offset += 500; - } } else if let Some(file_path) = payload.get("file_path").and_then(|v| v.as_str()) { // Single file index let path = std::path::PathBuf::from(file_path); @@ -491,9 +465,12 @@ async fn process_organize( // Promote this track's wanted item from Downloaded to Owned let _ = queries::wanted::promote_downloaded_to_owned(conn).await; - // Check if pipeline is complete — run cleanup then enrichment + // Check if pipeline is complete — run cleanup then create enrich work items + // Exclude this item from the check since it's still "running" in the DB + let mut downstream = Vec::new(); if let Some(ref pipeline_id) = item.pipeline_id - && let Ok(true) = queries::work_queue::pipeline_is_complete(conn, pipeline_id).await + && let Ok(true) = + queries::work_queue::pipeline_is_complete(conn, pipeline_id, Some(item.id)).await { tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup"); @@ -514,12 +491,45 @@ async fn process_organize( _ => {} } - let state = state.clone(); - tokio::spawn(async move { - if let Err(e) = crate::routes::artists::enrich_all_watched_artists(&state).await { - tracing::error!(error = %e, "post-pipeline enrichment failed"); - } - }); + // Create Enrich work items for each artist that has wanted items + let all_wanted = queries::wanted::list(conn, None, None) + .await + .unwrap_or_default(); + let mut artist_ids: Vec = all_wanted.iter().filter_map(|w| w.artist_id).collect(); + artist_ids.sort(); + artist_ids.dedup(); + + for artist_id in &artist_ids { + let payload = serde_json::json!({"artist_id": artist_id}); + downstream.push((WorkTaskType::Enrich, payload.to_string())); + } + } + + Ok(downstream) +} + +async fn process_enrich( + state: &web::Data, + item: &shanty_db::entities::work_queue::Model, +) -> WorkResult { + let payload: serde_json::Value = + serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?; + let artist_id = payload + .get("artist_id") + .and_then(|v| v.as_i64()) + .ok_or("missing artist_id in payload")? as i32; + + crate::routes::artists::enrich_artist(state, &artist_id.to_string(), false) + .await + .map_err(|e| e.to_string())?; + + // If this pipeline is fully done, clear its completed items + if let Some(ref pipeline_id) = item.pipeline_id + && let Ok(true) = + queries::work_queue::pipeline_is_complete(state.db.conn(), pipeline_id, Some(item.id)) + .await + { + let _ = queries::work_queue::clear_pipeline(state.db.conn(), pipeline_id).await; } Ok(vec![])