hopefully a fix for the enrich pipeline sometimes not running

This commit is contained in:
Connor Johnstone
2026-03-25 13:37:29 -04:00
parent 7827841de6
commit b59bf4cc5d

View File

@@ -153,6 +153,7 @@ fn spawn_worker(state: web::Data<AppState>, task_type: WorkTaskType, concurrency
tracing::error!(id = item_id, error = %e, "failed to mark work item complete");
}
// Enqueue downstream items
let has_downstream = !downstream.is_empty();
for (task_type, payload) in downstream {
if let Err(e) = queries::work_queue::enqueue(
state.db.conn(),
@@ -169,6 +170,21 @@ fn spawn_worker(state: web::Data<AppState>, task_type: WorkTaskType, concurrency
}
state.workers.notify(task_type);
}
// Check pipeline completion when this item had no downstream
// (terminal step or empty result). Any worker can trigger this.
if !has_downstream
&& item_task_type != WorkTaskType::Enrich
&& let Some(ref pid) = pipeline_id
&& let Ok(true) = queries::work_queue::pipeline_is_complete(
state.db.conn(),
pid,
None, // item already marked complete
)
.await
{
trigger_pipeline_completion(&state, pid).await;
}
}
Err(error) => {
tracing::warn!(
@@ -464,47 +480,54 @@ async fn process_organize(
// Promote this track's wanted item from Downloaded to Owned
let _ = queries::wanted::promote_downloaded_to_owned(conn).await;
// Check if pipeline is complete — run cleanup then create enrich work items
// Exclude this item from the check since it's still "running" in the DB
let mut downstream = Vec::new();
if let Some(ref pipeline_id) = item.pipeline_id
&& let Ok(true) =
queries::work_queue::pipeline_is_complete(conn, pipeline_id, Some(item.id)).await
{
tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup");
Ok(vec![])
}
// Cleanup: remove orphaned tracks, empty albums, unused artists
match queries::tracks::delete_orphaned(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up orphaned tracks"),
Err(e) => tracing::warn!(error = %e, "failed to clean orphaned tracks"),
_ => {}
}
match queries::albums::delete_empty(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up empty albums"),
Err(e) => tracing::warn!(error = %e, "failed to clean empty albums"),
_ => {}
}
match queries::artists::delete_unused(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up unused artists"),
Err(e) => tracing::warn!(error = %e, "failed to clean unused artists"),
_ => {}
}
/// Called when all non-Enrich items in a pipeline are complete.
/// Runs cleanup and creates Enrich work items for each watched artist.
async fn trigger_pipeline_completion(state: &web::Data<AppState>, pipeline_id: &str) {
let conn = state.db.conn();
tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup");
// Create Enrich work items for each artist that has wanted items
let all_wanted = queries::wanted::list(conn, None, None)
.await
.unwrap_or_default();
let mut artist_ids: Vec<i32> = all_wanted.iter().filter_map(|w| w.artist_id).collect();
artist_ids.sort();
artist_ids.dedup();
for artist_id in &artist_ids {
let payload = serde_json::json!({"artist_id": artist_id});
downstream.push((WorkTaskType::Enrich, payload.to_string()));
}
// Cleanup: remove orphaned tracks, empty albums, unused artists
match queries::tracks::delete_orphaned(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up orphaned tracks"),
Err(e) => tracing::warn!(error = %e, "failed to clean orphaned tracks"),
_ => {}
}
match queries::albums::delete_empty(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up empty albums"),
Err(e) => tracing::warn!(error = %e, "failed to clean empty albums"),
_ => {}
}
match queries::artists::delete_unused(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up unused artists"),
Err(e) => tracing::warn!(error = %e, "failed to clean unused artists"),
_ => {}
}
Ok(downstream)
// Create Enrich work items for each artist that has wanted items
let all_wanted = queries::wanted::list(conn, None, None)
.await
.unwrap_or_default();
let mut artist_ids: Vec<i32> = all_wanted.iter().filter_map(|w| w.artist_id).collect();
artist_ids.sort();
artist_ids.dedup();
for artist_id in &artist_ids {
let payload = serde_json::json!({"artist_id": artist_id});
if let Err(e) = queries::work_queue::enqueue(
conn,
WorkTaskType::Enrich,
&payload.to_string(),
Some(pipeline_id),
)
.await
{
tracing::error!(error = %e, "failed to enqueue enrich work item");
}
}
state.workers.notify(WorkTaskType::Enrich);
}
async fn process_enrich(
@@ -522,14 +545,5 @@ async fn process_enrich(
.await
.map_err(|e| e.to_string())?;
// If this pipeline is fully done, clear its completed items
if let Some(ref pipeline_id) = item.pipeline_id
&& let Ok(true) =
queries::work_queue::pipeline_is_complete(state.db.conn(), pipeline_id, Some(item.id))
.await
{
let _ = queries::work_queue::clear_pipeline(state.db.conn(), pipeline_id).await;
}
Ok(vec![])
}