From b59bf4cc5dde81f55c1108d37ad3fb4b42f5c8bb Mon Sep 17 00:00:00 2001 From: Connor Johnstone Date: Wed, 25 Mar 2026 13:37:29 -0400 Subject: [PATCH] hopefully a fix for the enrich pipeline sometimes not running --- src/workers.rs | 106 ++++++++++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/src/workers.rs b/src/workers.rs index e34916a..f33092f 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -153,6 +153,7 @@ fn spawn_worker(state: web::Data, task_type: WorkTaskType, concurrency tracing::error!(id = item_id, error = %e, "failed to mark work item complete"); } // Enqueue downstream items + let has_downstream = !downstream.is_empty(); for (task_type, payload) in downstream { if let Err(e) = queries::work_queue::enqueue( state.db.conn(), @@ -169,6 +170,21 @@ fn spawn_worker(state: web::Data, task_type: WorkTaskType, concurrency } state.workers.notify(task_type); } + + // Check pipeline completion when this item had no downstream + // (terminal step or empty result). Any worker can trigger this. + if !has_downstream + && item_task_type != WorkTaskType::Enrich + && let Some(ref pid) = pipeline_id + && let Ok(true) = queries::work_queue::pipeline_is_complete( + state.db.conn(), + pid, + None, // item already marked complete + ) + .await + { + trigger_pipeline_completion(&state, pid).await; + } } Err(error) => { tracing::warn!( @@ -464,47 +480,54 @@ async fn process_organize( // Promote this track's wanted item from Downloaded to Owned let _ = queries::wanted::promote_downloaded_to_owned(conn).await; - // Check if pipeline is complete — run cleanup then create enrich work items - // Exclude this item from the check since it's still "running" in the DB - let mut downstream = Vec::new(); - if let Some(ref pipeline_id) = item.pipeline_id - && let Ok(true) = - queries::work_queue::pipeline_is_complete(conn, pipeline_id, Some(item.id)).await - { - tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup"); + Ok(vec![]) +} - // Cleanup: remove orphaned tracks, empty albums, unused artists - match queries::tracks::delete_orphaned(conn).await { - Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up orphaned tracks"), - Err(e) => tracing::warn!(error = %e, "failed to clean orphaned tracks"), - _ => {} - } - match queries::albums::delete_empty(conn).await { - Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up empty albums"), - Err(e) => tracing::warn!(error = %e, "failed to clean empty albums"), - _ => {} - } - match queries::artists::delete_unused(conn).await { - Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up unused artists"), - Err(e) => tracing::warn!(error = %e, "failed to clean unused artists"), - _ => {} - } +/// Called when all non-Enrich items in a pipeline are complete. +/// Runs cleanup and creates Enrich work items for each watched artist. +async fn trigger_pipeline_completion(state: &web::Data, pipeline_id: &str) { + let conn = state.db.conn(); + tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup"); - // Create Enrich work items for each artist that has wanted items - let all_wanted = queries::wanted::list(conn, None, None) - .await - .unwrap_or_default(); - let mut artist_ids: Vec = all_wanted.iter().filter_map(|w| w.artist_id).collect(); - artist_ids.sort(); - artist_ids.dedup(); - - for artist_id in &artist_ids { - let payload = serde_json::json!({"artist_id": artist_id}); - downstream.push((WorkTaskType::Enrich, payload.to_string())); - } + // Cleanup: remove orphaned tracks, empty albums, unused artists + match queries::tracks::delete_orphaned(conn).await { + Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up orphaned tracks"), + Err(e) => tracing::warn!(error = %e, "failed to clean orphaned tracks"), + _ => {} + } + match queries::albums::delete_empty(conn).await { + Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up empty albums"), + Err(e) => tracing::warn!(error = %e, "failed to clean empty albums"), + _ => {} + } + match queries::artists::delete_unused(conn).await { + Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up unused artists"), + Err(e) => tracing::warn!(error = %e, "failed to clean unused artists"), + _ => {} } - Ok(downstream) + // Create Enrich work items for each artist that has wanted items + let all_wanted = queries::wanted::list(conn, None, None) + .await + .unwrap_or_default(); + let mut artist_ids: Vec = all_wanted.iter().filter_map(|w| w.artist_id).collect(); + artist_ids.sort(); + artist_ids.dedup(); + + for artist_id in &artist_ids { + let payload = serde_json::json!({"artist_id": artist_id}); + if let Err(e) = queries::work_queue::enqueue( + conn, + WorkTaskType::Enrich, + &payload.to_string(), + Some(pipeline_id), + ) + .await + { + tracing::error!(error = %e, "failed to enqueue enrich work item"); + } + } + state.workers.notify(WorkTaskType::Enrich); } async fn process_enrich( @@ -522,14 +545,5 @@ async fn process_enrich( .await .map_err(|e| e.to_string())?; - // If this pipeline is fully done, clear its completed items - if let Some(ref pipeline_id) = item.pipeline_id - && let Ok(true) = - queries::work_queue::pipeline_is_complete(state.db.conn(), pipeline_id, Some(item.id)) - .await - { - let _ = queries::work_queue::clear_pipeline(state.db.conn(), pipeline_id).await; - } - Ok(vec![]) }