fixed some unwatch cleanup stuff

This commit is contained in:
Connor Johnstone
2026-03-24 20:41:13 -04:00
parent 45a7dcd8cd
commit 7b14ed593f
6 changed files with 142 additions and 84 deletions
+3
View File
@@ -16,6 +16,9 @@ pub async fn trigger_pipeline(state: &web::Data<AppState>) -> Result<String, Api
let pipeline_id = uuid::Uuid::new_v4().to_string();
let conn = state.db.conn();
// Clear completed/failed items from previous pipeline runs (not standalone tasks)
let _ = queries::work_queue::clear_all_pipelines(conn).await;
// Step 1: Sync wanted items to download queue (fast, just DB inserts)
let sync_stats = shanty_dl::sync_wanted_to_queue(conn, false).await?;
tracing::info!(
+4 -7
View File
@@ -34,6 +34,7 @@ struct ArtistListItem {
name: String,
musicbrainz_id: Option<String>,
monitored: bool,
enriched: bool,
total_watched: usize,
total_owned: usize,
total_items: usize,
@@ -111,17 +112,12 @@ async fn list_artists(
None
};
let enriched = cached_totals.is_some();
let (total_watched, total_owned, total_items) =
if let Some((avail, watched, owned)) = cached_totals {
(watched as usize, owned as usize, avail as usize)
} else {
// Fall back to wanted item counts
let total_items = artist_wanted.len();
let total_owned = artist_wanted
.iter()
.filter(|w| w.status == WantedStatus::Owned)
.count();
(total_items, total_owned, total_items)
(0, 0, 0)
};
items.push(ArtistListItem {
@@ -129,6 +125,7 @@ async fn list_artists(
name: a.name.clone(),
musicbrainz_id: a.musicbrainz_id.clone(),
monitored: a.monitored,
enriched,
total_watched,
total_owned,
total_items,
+50 -40
View File
@@ -36,6 +36,7 @@ impl WorkerManager {
notifiers.insert(WorkTaskType::Index, Arc::new(Notify::new()));
notifiers.insert(WorkTaskType::Tag, Arc::new(Notify::new()));
notifiers.insert(WorkTaskType::Organize, Arc::new(Notify::new()));
notifiers.insert(WorkTaskType::Enrich, Arc::new(Notify::new()));
Self { notifiers }
}
@@ -83,6 +84,7 @@ impl WorkerManager {
cfg.tagging.concurrency,
);
spawn_worker(state_clone.clone(), WorkTaskType::Organize, 4);
spawn_worker(state_clone.clone(), WorkTaskType::Enrich, 2);
});
}
}
@@ -141,6 +143,7 @@ fn spawn_worker(state: web::Data<AppState>, task_type: WorkTaskType, concurrency
WorkTaskType::Index => process_index(&state, &item).await,
WorkTaskType::Tag => process_tag(&state, &item).await,
WorkTaskType::Organize => process_organize(&state, &item).await,
WorkTaskType::Enrich => process_enrich(&state, &item).await,
};
match result {
@@ -341,43 +344,14 @@ async fn process_index(
.await
.map_err(|e| e.to_string())?;
// Create Tag work items for tracks that still need processing:
// 1. Tracks without MBIDs (need MB search + tagging)
// 2. Tracks with MBIDs but no wanted_item yet (need wanted_item creation + organize)
let needs_processing = queries::tracks::get_needing_metadata(conn)
// Create Tag work items for tracks not yet tagged by shanty
let untagged = queries::tracks::get_untagged(conn)
.await
.map_err(|e| e.to_string())?;
for track in &needs_processing {
for track in &untagged {
let tag_payload = serde_json::json!({"track_id": track.id});
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
}
// Also process tracks that have MBIDs (from file tags) but no wanted_item
let all_wanted = queries::wanted::list(conn, None, None)
.await
.map_err(|e| e.to_string())?;
let wanted_mbids: std::collections::HashSet<&str> = all_wanted
.iter()
.filter_map(|w| w.musicbrainz_id.as_deref())
.collect();
let mut offset = 0u64;
loop {
let tracks = queries::tracks::list(conn, 500, offset)
.await
.map_err(|e| e.to_string())?;
if tracks.is_empty() {
break;
}
for track in &tracks {
if let Some(ref mbid) = track.musicbrainz_id
&& !wanted_mbids.contains(mbid.as_str())
{
let tag_payload = serde_json::json!({"track_id": track.id});
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
}
}
offset += 500;
}
} else if let Some(file_path) = payload.get("file_path").and_then(|v| v.as_str()) {
// Single file index
let path = std::path::PathBuf::from(file_path);
@@ -491,9 +465,12 @@ async fn process_organize(
// Promote this track's wanted item from Downloaded to Owned
let _ = queries::wanted::promote_downloaded_to_owned(conn).await;
// Check if pipeline is complete — run cleanup then enrichment
// Check if pipeline is complete — run cleanup then create enrich work items
// Exclude this item from the check since it's still "running" in the DB
let mut downstream = Vec::new();
if let Some(ref pipeline_id) = item.pipeline_id
&& let Ok(true) = queries::work_queue::pipeline_is_complete(conn, pipeline_id).await
&& let Ok(true) =
queries::work_queue::pipeline_is_complete(conn, pipeline_id, Some(item.id)).await
{
tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup");
@@ -514,12 +491,45 @@ async fn process_organize(
_ => {}
}
let state = state.clone();
tokio::spawn(async move {
if let Err(e) = crate::routes::artists::enrich_all_watched_artists(&state).await {
tracing::error!(error = %e, "post-pipeline enrichment failed");
}
});
// Create Enrich work items for each artist that has wanted items
let all_wanted = queries::wanted::list(conn, None, None)
.await
.unwrap_or_default();
let mut artist_ids: Vec<i32> = all_wanted.iter().filter_map(|w| w.artist_id).collect();
artist_ids.sort();
artist_ids.dedup();
for artist_id in &artist_ids {
let payload = serde_json::json!({"artist_id": artist_id});
downstream.push((WorkTaskType::Enrich, payload.to_string()));
}
}
Ok(downstream)
}
async fn process_enrich(
state: &web::Data<AppState>,
item: &shanty_db::entities::work_queue::Model,
) -> WorkResult {
let payload: serde_json::Value =
serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?;
let artist_id = payload
.get("artist_id")
.and_then(|v| v.as_i64())
.ok_or("missing artist_id in payload")? as i32;
crate::routes::artists::enrich_artist(state, &artist_id.to_string(), false)
.await
.map_err(|e| e.to_string())?;
// If this pipeline is fully done, clear its completed items
if let Some(ref pipeline_id) = item.pipeline_id
&& let Ok(true) =
queries::work_queue::pipeline_is_complete(state.db.conn(), pipeline_id, Some(item.id))
.await
{
let _ = queries::work_queue::clear_pipeline(state.db.conn(), pipeline_id).await;
}
Ok(vec![])