From 823ef150228be10bd36a79476572b7de17919aec Mon Sep 17 00:00:00 2001 From: Connor Johnstone Date: Tue, 24 Mar 2026 15:58:14 -0400 Subject: [PATCH] Added the import/cleanup functionality --- frontend/src/api.rs | 41 +++++++++++++ frontend/src/pages/album.rs | 26 ++++++++ frontend/src/pages/artist.rs | 98 +++++++++++++++++++++++++++--- frontend/src/pages/library.rs | 36 ++++++++++- frontend/src/pages/settings.rs | 12 ++++ frontend/src/types.rs | 6 ++ src/pipeline.rs | 13 +++- src/routes/albums.rs | 32 ++++++++++ src/routes/artists.rs | 24 +++++++- src/routes/tracks.rs | 24 +++++++- src/workers.rs | 107 +++++++++++++++++++++++++++++---- 11 files changed, 392 insertions(+), 27 deletions(-) diff --git a/frontend/src/api.rs b/frontend/src/api.rs index 3b2b749..26f6811 100644 --- a/frontend/src/api.rs +++ b/frontend/src/api.rs @@ -185,6 +185,47 @@ pub async fn watch_track( post_json(&format!("{BASE}/tracks/watch"), &body).await } +pub async fn unwatch_artist(id: i32) -> Result { + let resp = Request::delete(&format!("{BASE}/artists/{id}/watch")) + .send() + .await + .map_err(|e| ApiError(e.to_string()))?; + if !resp.ok() { + return Err(ApiError(format!("HTTP {}", resp.status()))); + } + resp.json().await.map_err(|e| ApiError(e.to_string())) +} + +pub async fn unwatch_album(mbid: &str) -> Result { + let resp = Request::delete(&format!("{BASE}/albums/{mbid}/watch")) + .send() + .await + .map_err(|e| ApiError(e.to_string()))?; + if !resp.ok() { + return Err(ApiError(format!("HTTP {}", resp.status()))); + } + resp.json().await.map_err(|e| ApiError(e.to_string())) +} + +pub async fn unwatch_track(mbid: &str) -> Result { + let body = serde_json::json!({"mbid": mbid}).to_string(); + let resp = Request::delete(&format!("{BASE}/tracks/watch")) + .header("Content-Type", "application/json") + .body(&body) + .map_err(|e| ApiError(e.to_string()))? + .send() + .await + .map_err(|e| ApiError(e.to_string()))?; + if !resp.ok() { + return Err(ApiError(format!("HTTP {}", resp.status()))); + } + resp.json().await.map_err(|e| ApiError(e.to_string())) +} + +pub async fn delete_artist(id: i32) -> Result<(), ApiError> { + delete(&format!("{BASE}/artists/{id}")).await +} + // --- Downloads --- pub async fn get_downloads(status: Option<&str>) -> Result, ApiError> { let mut url = format!("{BASE}/downloads/queue"); diff --git a/frontend/src/pages/album.rs b/frontend/src/pages/album.rs index 5852cb6..19d19a0 100644 --- a/frontend/src/pages/album.rs +++ b/frontend/src/pages/album.rs @@ -143,6 +143,27 @@ pub fn album_page(props: &Props) -> Html { }) }; + let on_unwatch_click = { + let detail = detail.clone(); + let mbid = t.recording_mbid.clone(); + Callback::from(move |_: MouseEvent| { + let detail = detail.clone(); + let mbid = mbid.clone(); + let idx = idx; + wasm_bindgen_futures::spawn_local(async move { + if api::unwatch_track(&mbid).await.is_ok() { + if let Some(ref d) = *detail { + let mut updated = d.clone(); + if let Some(track) = updated.tracks.get_mut(idx) { + track.status = None; + } + detail.set(Some(updated)); + } + } + }); + }) + }; + html! { <> @@ -162,6 +183,11 @@ pub fn album_page(props: &Props) -> Html { onclick={on_watch_click}> { "Watch" } + } else { + } + } + } else { + // Watch All let artist_name = d.artist.name.clone(); let artist_mbid = d.artist.musicbrainz_id.clone(); let message = message.clone(); @@ -140,8 +171,6 @@ pub fn artist_page(props: &Props) -> Html { { "Watch All" } } - } else { - html! {} } }; @@ -191,6 +220,33 @@ pub fn artist_page(props: &Props) -> Html { } }; + let remove_btn = { + let artist_id_num = d.artist.id; + if artist_id_num > 0 { + let error = error.clone(); + html! { + + } + } else { + html! {} + } + }; + html! {
if let Some(ref banner) = d.artist_banner { @@ -249,6 +305,7 @@ pub fn artist_page(props: &Props) -> Html {
{ watch_all_btn } { monitor_btn } + { remove_btn }
if let Some(ref bio) = d.artist_bio { @@ -315,7 +372,7 @@ pub fn artist_page(props: &Props) -> Html { let tc = album.track_count; - // Watch button for unwatched albums + // Watch/Unwatch toggle for albums let watch_btn = if is_unwatched { let artist_name = d.artist.name.clone(); let album_title = album.title.clone(); @@ -351,7 +408,34 @@ pub fn artist_page(props: &Props) -> Html { } } else { - html! {} + let album_title = album.title.clone(); + let album_mbid = album.mbid.clone(); + let message = message.clone(); + let error = error.clone(); + let fetch = fetch.clone(); + let artist_id = id.clone(); + html! { + + } }; html! { diff --git a/frontend/src/pages/library.rs b/frontend/src/pages/library.rs index a6ed1c2..178f8b5 100644 --- a/frontend/src/pages/library.rs +++ b/frontend/src/pages/library.rs @@ -10,16 +10,25 @@ pub fn library_page() -> Html { let artists = use_state(|| None::>); let error = use_state(|| None::); - { + let fetch_artists = { let artists = artists.clone(); let error = error.clone(); - use_effect_with((), move |_| { + Callback::from(move |_: ()| { + let artists = artists.clone(); + let error = error.clone(); wasm_bindgen_futures::spawn_local(async move { match api::list_artists(200, 0).await { Ok(a) => artists.set(Some(a)), Err(e) => error.set(Some(e.0)), } }); + }) + }; + + { + let fetch = fetch_artists.clone(); + use_effect_with((), move |_| { + fetch.emit(()); }); } @@ -49,10 +58,25 @@ pub fn library_page() -> Html { { "Owned" } { "Watched" } { "Tracks" } + - { for artists.iter().map(|a| html! { + { for artists.iter().map(|a| { + let artist_id = a.id; + let error = error.clone(); + let fetch = fetch_artists.clone(); + let on_remove = Callback::from(move |_: MouseEvent| { + let error = error.clone(); + let fetch = fetch.clone(); + wasm_bindgen_futures::spawn_local(async move { + match api::delete_artist(artist_id).await { + Ok(_) => fetch.emit(()), + Err(e) => error.set(Some(e.0)), + } + }); + }); + html! { to={Route::Artist { id: a.id.to_string() }}> @@ -90,7 +114,13 @@ pub fn library_page() -> Html { { a.total_items } } + + + + } })} diff --git a/frontend/src/pages/settings.rs b/frontend/src/pages/settings.rs index 7c0b0c8..1d8c0e5 100644 --- a/frontend/src/pages/settings.rs +++ b/frontend/src/pages/settings.rs @@ -453,6 +453,18 @@ pub fn settings_page() -> Html { } })} /> +
+ + () { + let mut cfg = (*config).clone().unwrap(); + cfg.tagging.concurrency = v; + config.set(Some(cfg)); + } + })} /> +
// Downloads diff --git a/frontend/src/types.rs b/frontend/src/types.rs index 4c03535..08b22c5 100644 --- a/frontend/src/types.rs +++ b/frontend/src/types.rs @@ -461,6 +461,12 @@ pub struct TaggingConfigFe { pub write_tags: bool, #[serde(default)] pub confidence: f64, + #[serde(default = "default_tag_concurrency")] + pub concurrency: usize, +} + +fn default_tag_concurrency() -> usize { + 4 } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] diff --git a/src/pipeline.rs b/src/pipeline.rs index e278382..0089750 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -42,10 +42,21 @@ pub async fn trigger_pipeline(state: &web::Data) -> Result, + session: Session, + path: web::Path, +) -> Result { + auth::require_auth(&session)?; + let mbid = path.into_inner(); + let conn = state.db.conn(); + + // Get the album's tracks from MB to find their recording MBIDs + let tracks = match state.mb_client.get_release_tracks(&mbid).await { + Ok(t) => t, + Err(_) => { + // Try as release-group + let release_mbid = resolve_release_from_group(&state, &mbid).await?; + state + .mb_client + .get_release_tracks(&release_mbid) + .await + .map_err(|e| ApiError::Internal(format!("MusicBrainz error: {e}")))? + } + }; + + let mut removed = 0u64; + for track in &tracks { + removed += queries::wanted::remove_by_mbid(conn, &track.recording_mbid).await?; + } + + Ok(HttpResponse::Ok().json(serde_json::json!({"removed": removed}))) +} diff --git a/src/routes/artists.rs b/src/routes/artists.rs index 67534be..71621ee 100644 --- a/src/routes/artists.rs +++ b/src/routes/artists.rs @@ -78,6 +78,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .route(web::post().to(set_monitored)) .route(web::delete().to(unset_monitored)), ) + .service(web::resource("/artists/{id}/watch").route(web::delete().to(unwatch_artist))) .service( web::resource("/artists/{id}") .route(web::get().to(get_artist)) @@ -839,12 +840,31 @@ async fn delete_artist( session: Session, path: web::Path, ) -> Result { - auth::require_admin(&session)?; + auth::require_auth(&session)?; let id = path.into_inner(); - queries::artists::delete(state.db.conn(), id).await?; + let conn = state.db.conn(); + + // Cascade: remove wanted items, tracks (DB only), albums, cache, then artist + queries::wanted::remove_by_artist(conn, id).await?; + queries::tracks::delete_by_artist(conn, id).await?; + queries::albums::delete_by_artist(conn, id).await?; + let _ = queries::cache::purge_prefix(conn, &format!("artist_totals:{id}")).await; + queries::artists::delete(conn, id).await?; + Ok(HttpResponse::NoContent().finish()) } +async fn unwatch_artist( + state: web::Data, + session: Session, + path: web::Path, +) -> Result { + auth::require_auth(&session)?; + let id = path.into_inner(); + let removed = queries::wanted::remove_by_artist(state.db.conn(), id).await?; + Ok(HttpResponse::Ok().json(serde_json::json!({"removed": removed}))) +} + async fn set_monitored( state: web::Data, session: Session, diff --git a/src/routes/tracks.rs b/src/routes/tracks.rs index fce5d57..acdd4fc 100644 --- a/src/routes/tracks.rs +++ b/src/routes/tracks.rs @@ -29,9 +29,13 @@ pub struct WatchTrackRequest { } pub fn configure(cfg: &mut web::ServiceConfig) { - cfg.service(web::resource("/tracks/watch").route(web::post().to(watch_track))) - .service(web::resource("/tracks").route(web::get().to(list_tracks))) - .service(web::resource("/tracks/{id}").route(web::get().to(get_track))); + cfg.service( + web::resource("/tracks/watch") + .route(web::post().to(watch_track)) + .route(web::delete().to(unwatch_track)), + ) + .service(web::resource("/tracks").route(web::get().to(list_tracks))) + .service(web::resource("/tracks/{id}").route(web::get().to(get_track))); } async fn list_tracks( @@ -87,3 +91,17 @@ async fn watch_track( "artist_name": entry.artist_name, }))) } + +async fn unwatch_track( + state: web::Data, + session: Session, + body: web::Json, +) -> Result { + auth::require_auth(&session)?; + let mbid = body + .mbid + .as_deref() + .ok_or_else(|| ApiError::BadRequest("provide recording mbid".into()))?; + let removed = queries::wanted::remove_by_mbid(state.db.conn(), mbid).await?; + Ok(HttpResponse::Ok().json(serde_json::json!({"removed": removed}))) +} diff --git a/src/workers.rs b/src/workers.rs index f406a0d..9811abb 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -68,13 +68,14 @@ impl WorkerManager { queries::work_queue::cleanup_completed(cleanup_state.db.conn(), 7).await; } }); - }); - // Spawn each worker type - spawn_worker(state.clone(), WorkTaskType::Download, 1); - spawn_worker(state.clone(), WorkTaskType::Index, 4); - spawn_worker(state.clone(), WorkTaskType::Tag, 2); - spawn_worker(state.clone(), WorkTaskType::Organize, 4); + // Read config for concurrency settings and spawn workers + let cfg = state_clone.config.read().await.clone(); + spawn_worker(state_clone.clone(), WorkTaskType::Download, 1); + spawn_worker(state_clone.clone(), WorkTaskType::Index, cfg.indexing.concurrency); + spawn_worker(state_clone.clone(), WorkTaskType::Tag, cfg.tagging.concurrency); + spawn_worker(state_clone.clone(), WorkTaskType::Organize, 4); + }); } } @@ -332,14 +333,43 @@ async fn process_index( .await .map_err(|e| e.to_string())?; - // Create Tag work items for all untagged tracks - let untagged = queries::tracks::get_needing_metadata(conn) + // Create Tag work items for tracks that still need processing: + // 1. Tracks without MBIDs (need MB search + tagging) + // 2. Tracks with MBIDs but no wanted_item yet (need wanted_item creation + organize) + let needs_processing = queries::tracks::get_needing_metadata(conn) .await .map_err(|e| e.to_string())?; - for track in &untagged { + for track in &needs_processing { let tag_payload = serde_json::json!({"track_id": track.id}); downstream.push((WorkTaskType::Tag, tag_payload.to_string())); } + + // Also process tracks that have MBIDs (from file tags) but no wanted_item + let all_wanted = queries::wanted::list(conn, None, None) + .await + .map_err(|e| e.to_string())?; + let wanted_mbids: std::collections::HashSet<&str> = all_wanted + .iter() + .filter_map(|w| w.musicbrainz_id.as_deref()) + .collect(); + let mut offset = 0u64; + loop { + let tracks = queries::tracks::list(conn, 500, offset) + .await + .map_err(|e| e.to_string())?; + if tracks.is_empty() { + break; + } + for track in &tracks { + if let Some(ref mbid) = track.musicbrainz_id { + if !wanted_mbids.contains(mbid.as_str()) { + let tag_payload = serde_json::json!({"track_id": track.id}); + downstream.push((WorkTaskType::Tag, tag_payload.to_string())); + } + } + } + offset += 500; + } } else if let Some(file_path) = payload.get("file_path").and_then(|v| v.as_str()) { // Single file index let path = std::path::PathBuf::from(file_path); @@ -384,6 +414,43 @@ async fn process_tag( .await .map_err(|e| e.to_string())?; + // Re-read the track to get the MBID set by tagging + let track = queries::tracks::get_by_id(conn, track_id) + .await + .map_err(|e| e.to_string())?; + + // Ensure a wanted_item exists for this track (marks imported files as Owned) + if let Some(ref mbid) = track.musicbrainz_id { + if queries::wanted::find_by_mbid(conn, mbid) + .await + .map_err(|e| e.to_string())? + .is_none() + { + let item = queries::wanted::add( + conn, + queries::wanted::AddWantedItem { + item_type: shanty_db::entities::wanted_item::ItemType::Track, + name: track.title.as_deref().unwrap_or("Unknown"), + musicbrainz_id: Some(mbid), + artist_id: track.artist_id, + album_id: track.album_id, + track_id: Some(track.id), + user_id: None, + }, + ) + .await + .map_err(|e| e.to_string())?; + + // Mark as Owned immediately since the file already exists + let _ = queries::wanted::update_status( + conn, + item.id, + shanty_db::entities::wanted_item::WantedStatus::Owned, + ) + .await; + } + } + // Create Organize work item let org_payload = serde_json::json!({"track_id": track_id}); Ok(vec![(WorkTaskType::Organize, org_payload.to_string())]) @@ -417,11 +484,29 @@ async fn process_organize( // Promote this track's wanted item from Downloaded to Owned let _ = queries::wanted::promote_downloaded_to_owned(conn).await; - // Check if pipeline is complete and trigger enrichment + // Check if pipeline is complete — run cleanup then enrichment if let Some(ref pipeline_id) = item.pipeline_id && let Ok(true) = queries::work_queue::pipeline_is_complete(conn, pipeline_id).await { - tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, triggering enrichment"); + tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup"); + + // Cleanup: remove orphaned tracks, empty albums, unused artists + match queries::tracks::delete_orphaned(conn).await { + Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up orphaned tracks"), + Err(e) => tracing::warn!(error = %e, "failed to clean orphaned tracks"), + _ => {} + } + match queries::albums::delete_empty(conn).await { + Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up empty albums"), + Err(e) => tracing::warn!(error = %e, "failed to clean empty albums"), + _ => {} + } + match queries::artists::delete_unused(conn).await { + Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up unused artists"), + Err(e) => tracing::warn!(error = %e, "failed to clean unused artists"), + _ => {} + } + let state = state.clone(); tokio::spawn(async move { if let Err(e) = crate::routes::artists::enrich_all_watched_artists(&state).await {