diff --git a/frontend/src/api.rs b/frontend/src/api.rs index 3365434..84238f2 100644 --- a/frontend/src/api.rs +++ b/frontend/src/api.rs @@ -171,6 +171,11 @@ pub async fn cancel_download(id: i32) -> Result<(), ApiError> { delete(&format!("{BASE}/downloads/{id}")).await } +// --- Pipeline --- +pub async fn trigger_pipeline() -> Result { + post_empty(&format!("{BASE}/pipeline")).await +} + // --- System --- pub async fn trigger_index() -> Result { post_empty(&format!("{BASE}/index")).await diff --git a/frontend/src/components/status_badge.rs b/frontend/src/components/status_badge.rs index 6273ca1..30cc2e9 100644 --- a/frontend/src/components/status_badge.rs +++ b/frontend/src/components/status_badge.rs @@ -15,6 +15,7 @@ pub fn status_badge(props: &Props) -> Html { "pending" => "badge badge-pending", "failed" => "badge badge-failed", "completed" => "badge badge-completed", + "running" => "badge badge-available", "downloading" => "badge badge-available", "cancelled" => "badge badge-pending", _ => "badge", diff --git a/frontend/src/pages/dashboard.rs b/frontend/src/pages/dashboard.rs index c2e4f5a..b6d3f1e 100644 --- a/frontend/src/pages/dashboard.rs +++ b/frontend/src/pages/dashboard.rs @@ -142,6 +142,26 @@ pub fn dashboard() -> Html { }) }; + let on_pipeline = { + let message = message.clone(); + let error = error.clone(); + let fetch = fetch_status.clone(); + Callback::from(move |_: MouseEvent| { + let message = message.clone(); + let error = error.clone(); + let fetch = fetch.clone(); + wasm_bindgen_futures::spawn_local(async move { + match api::trigger_pipeline().await { + Ok(p) => { + message.set(Some(format!("Pipeline started — {} tasks queued", p.task_ids.len()))); + fetch.emit(()); + } + Err(e) => error.set(Some(e.0)), + } + }); + }) + }; + if let Some(ref err) = *error { return html! {
{ format!("Error: {err}") }
}; } @@ -150,6 +170,8 @@ pub fn dashboard() -> Html { return html! {

{ "Loading..." }

}; }; + let pipeline_active = s.tasks.iter().any(|t| t.status == "Pending" || t.status == "Running"); + html! {
- // Actions + // Pipeline
-

{ "Actions" }

+
+

{ "Pipeline" }

+ +
+

+ { "Sync \u{2192} Download \u{2192} Index \u{2192} Tag \u{2192} Organize \u{2192} Enrich" } +

+
+ + // Individual Actions +
+

{ "Individual Actions" }

- - + + diff --git a/frontend/src/types.rs b/frontend/src/types.rs index 9989517..e0e5159 100644 --- a/frontend/src/types.rs +++ b/frontend/src/types.rs @@ -180,6 +180,11 @@ pub struct TaskRef { pub task_id: String, } +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct PipelineRef { + pub task_ids: Vec, +} + // --- Status --- #[derive(Debug, Clone, PartialEq, Deserialize)] diff --git a/src/routes/artists.rs b/src/routes/artists.rs index 5a72bf8..509f42e 100644 --- a/src/routes/artists.rs +++ b/src/routes/artists.rs @@ -144,7 +144,7 @@ async fn get_artist( /// Fetch (or retrieve from cache) the tracklist for a release group. /// Cache key: `artist_rg_tracks:{release_group_id}` async fn get_cached_album_tracks( - state: &web::Data, + state: &AppState, rg_id: &str, first_release_id: Option<&str>, ttl_seconds: i64, @@ -241,9 +241,19 @@ async fn get_artist_full( ) -> Result { let id_or_mbid = path.into_inner(); let quick_mode = query.quick; + let result = enrich_artist(&state, &id_or_mbid, quick_mode).await?; + Ok(HttpResponse::Ok().json(result)) +} +/// Enrich an artist's data: fetch release groups, track lists, compute totals. +/// Can be called from HTTP handlers or background tasks. +pub async fn enrich_artist( + state: &AppState, + id_or_mbid: &str, + quick_mode: bool, +) -> Result { // Resolve artist: local ID or MBID - let (artist, id, mbid) = if let Ok(local_id) = id_or_mbid.parse::() { + let (artist, id, mbid) = if let Ok(local_id) = id_or_mbid.parse() { let artist = queries::artists::get_by_id(state.db.conn(), local_id).await?; let mbid = match &artist.musicbrainz_id { Some(m) => m.clone(), @@ -256,7 +266,7 @@ async fn get_artist_full( }; (artist, Some(local_id), mbid) } else { - let mbid = id_or_mbid; + let mbid = id_or_mbid.to_string(); // Direct MBID lookup — first check local DB, then MusicBrainz let local = { @@ -489,7 +499,7 @@ async fn get_artist_full( } } - Ok(HttpResponse::Ok().json(serde_json::json!({ + Ok(serde_json::json!({ "artist": artist, "albums": albums, "artist_status": artist_status, @@ -497,7 +507,30 @@ async fn get_artist_full( "total_watched_tracks": total_artist_watched, "total_owned_tracks": total_artist_owned, "enriched": !skip_track_fetch, - }))) + })) +} + +/// Enrich all watched artists in the background, updating their cached totals. +pub async fn enrich_all_watched_artists(state: &AppState) -> Result { + let all_wanted = queries::wanted::list(state.db.conn(), None).await?; + + // Collect unique artist IDs that have any wanted items + let mut artist_ids: Vec = all_wanted + .iter() + .filter_map(|w| w.artist_id) + .collect(); + artist_ids.sort(); + artist_ids.dedup(); + + let mut count = 0u32; + for artist_id in &artist_ids { + match enrich_artist(state, &artist_id.to_string(), false).await { + Ok(_) => count += 1, + Err(e) => tracing::warn!(artist_id = artist_id, error = %e, "failed to enrich artist"), + } + } + + Ok(count) } async fn add_artist( @@ -514,6 +547,16 @@ async fn add_artist( &state.mb_client, ) .await?; + + // Enrich the newly watched artist in the background so library totals are populated + if let Some(ref mbid) = body.mbid { + let state = state.clone(); + let mbid = mbid.clone(); + tokio::spawn(async move { + let _ = enrich_artist(&state, &mbid, false).await; + }); + } + Ok(HttpResponse::Ok().json(serde_json::json!({ "tracks_added": summary.tracks_added, "tracks_already_owned": summary.tracks_already_owned, diff --git a/src/routes/downloads.rs b/src/routes/downloads.rs index a3f87e1..d474f6b 100644 --- a/src/routes/downloads.rs +++ b/src/routes/downloads.rs @@ -5,6 +5,7 @@ use shanty_db::entities::download_queue::DownloadStatus; use shanty_db::queries; use crate::error::ApiError; +use crate::routes::artists::enrich_all_watched_artists; use crate::state::AppState; #[derive(Deserialize)] @@ -106,9 +107,9 @@ async fn trigger_process( match shanty_dl::run_queue_with_progress(state.db.conn(), &backend, &backend_config, false, Some(on_progress)).await { Ok(stats) => { - // Invalidate cached artist totals so library/detail pages show fresh data - let _ = shanty_db::queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; state.tasks.complete(&tid, format!("{stats}")); + // Refresh artist data in background + let _ = enrich_all_watched_artists(&state).await; } Err(e) => state.tasks.fail(&tid, e.to_string()), } diff --git a/src/routes/system.rs b/src/routes/system.rs index fa23b13..c5b513e 100644 --- a/src/routes/system.rs +++ b/src/routes/system.rs @@ -4,10 +4,12 @@ use shanty_db::entities::download_queue::DownloadStatus; use shanty_db::queries; use crate::error::ApiError; +use crate::routes::artists::enrich_all_watched_artists; use crate::state::AppState; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("/status").route(web::get().to(get_status))) + .service(web::resource("/pipeline").route(web::post().to(trigger_pipeline))) .service(web::resource("/index").route(web::post().to(trigger_index))) .service(web::resource("/tag").route(web::post().to(trigger_tag))) .service(web::resource("/organize").route(web::post().to(trigger_organize))) @@ -126,14 +128,14 @@ async fn trigger_organize( let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) .await .unwrap_or(0); - // Invalidate cached artist totals so library/detail pages show fresh data - let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; let msg = if promoted > 0 { format!("{stats} — {promoted} items marked as owned") } else { format!("{stats}") }; state.tasks.complete(&tid, msg); + // Refresh artist data in background + let _ = enrich_all_watched_artists(&state).await; } Err(e) => state.tasks.fail(&tid, e.to_string()), } @@ -142,6 +144,128 @@ async fn trigger_organize( Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) } +async fn trigger_pipeline( + state: web::Data, +) -> Result { + // Register all 6 pipeline tasks as Pending + let sync_id = state.tasks.register_pending("sync"); + let download_id = state.tasks.register_pending("download"); + let index_id = state.tasks.register_pending("index"); + let tag_id = state.tasks.register_pending("tag"); + let organize_id = state.tasks.register_pending("organize"); + let enrich_id = state.tasks.register_pending("enrich"); + + let task_ids = vec![ + sync_id.clone(), + download_id.clone(), + index_id.clone(), + tag_id.clone(), + organize_id.clone(), + enrich_id.clone(), + ]; + + let state = state.clone(); + + tokio::spawn(async move { + // Step 1: Sync + state.tasks.start(&sync_id); + state.tasks.update_progress(&sync_id, 0, 0, "Syncing watchlist to download queue..."); + match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await { + Ok(stats) => state.tasks.complete(&sync_id, format!("{stats}")), + Err(e) => state.tasks.fail(&sync_id, e.to_string()), + } + + // Step 2: Download + state.tasks.start(&download_id); + let cookies = state.config.download.cookies_path.clone(); + let format: shanty_dl::AudioFormat = state.config.download.format.parse().unwrap_or(shanty_dl::AudioFormat::Opus); + let source: shanty_dl::SearchSource = state.config.download.search_source.parse().unwrap_or(shanty_dl::SearchSource::YouTubeMusic); + let rate = if cookies.is_some() { 1800 } else { 450 }; + let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone()); + let backend_config = shanty_dl::BackendConfig { + output_dir: state.config.download_path.clone(), + format, + cookies_path: cookies, + }; + let task_state = state.clone(); + let progress_tid = download_id.clone(); + let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| { + task_state.tasks.update_progress(&progress_tid, current, total, msg); + }); + match shanty_dl::run_queue_with_progress(state.db.conn(), &backend, &backend_config, false, Some(on_progress)).await { + Ok(stats) => { + let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; + state.tasks.complete(&download_id, format!("{stats}")); + } + Err(e) => state.tasks.fail(&download_id, e.to_string()), + } + + // Step 3: Index + state.tasks.start(&index_id); + state.tasks.update_progress(&index_id, 0, 0, "Scanning library..."); + let scan_config = shanty_index::ScanConfig { + root: state.config.library_path.clone(), + dry_run: false, + concurrency: 4, + }; + match shanty_index::run_scan(state.db.conn(), &scan_config).await { + Ok(stats) => state.tasks.complete(&index_id, format!("{stats}")), + Err(e) => state.tasks.fail(&index_id, e.to_string()), + } + + // Step 4: Tag + state.tasks.start(&tag_id); + state.tasks.update_progress(&tag_id, 0, 0, "Tagging tracks..."); + match shanty_tag::MusicBrainzClient::new() { + Ok(mb) => { + let tag_config = shanty_tag::TagConfig { + dry_run: false, + write_tags: state.config.tagging.write_tags, + confidence: state.config.tagging.confidence, + }; + match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { + Ok(stats) => state.tasks.complete(&tag_id, format!("{stats}")), + Err(e) => state.tasks.fail(&tag_id, e.to_string()), + } + } + Err(e) => state.tasks.fail(&tag_id, e.to_string()), + } + + // Step 5: Organize + state.tasks.start(&organize_id); + state.tasks.update_progress(&organize_id, 0, 0, "Organizing files..."); + let org_config = shanty_org::OrgConfig { + target_dir: state.config.library_path.clone(), + format: state.config.organization_format.clone(), + dry_run: false, + copy: false, + }; + match shanty_org::organize_from_db(state.db.conn(), &org_config).await { + Ok(stats) => { + let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) + .await.unwrap_or(0); + let msg = if promoted > 0 { + format!("{stats} — {promoted} items marked as owned") + } else { + format!("{stats}") + }; + state.tasks.complete(&organize_id, msg); + } + Err(e) => state.tasks.fail(&organize_id, e.to_string()), + } + + // Step 6: Enrich — refresh cached artist totals for the library page + state.tasks.start(&enrich_id); + state.tasks.update_progress(&enrich_id, 0, 0, "Refreshing artist data..."); + match enrich_all_watched_artists(&state).await { + Ok(count) => state.tasks.complete(&enrich_id, format!("{count} artists refreshed")), + Err(e) => state.tasks.fail(&enrich_id, e.to_string()), + } + }); + + Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids }))) +} + async fn get_task( state: web::Data, path: web::Path, diff --git a/src/tasks.rs b/src/tasks.rs index 2cb7823..ea970d7 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -24,6 +24,7 @@ pub struct TaskInfo { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] pub enum TaskStatus { + Pending, Running, Completed, Failed, @@ -56,6 +57,29 @@ impl TaskManager { id } + /// Register a new task as Pending (queued, not yet running). Returns the task ID. + pub fn register_pending(&self, task_type: &str) -> String { + let id = uuid::Uuid::new_v4().to_string(); + let info = TaskInfo { + id: id.clone(), + task_type: task_type.to_string(), + status: TaskStatus::Pending, + progress: None, + started_at: Utc::now().naive_utc(), + completed_at: None, + result: None, + }; + self.tasks.lock().unwrap().insert(id.clone(), info); + id + } + + /// Transition a task from Pending to Running. + pub fn start(&self, id: &str) { + if let Some(task) = self.tasks.lock().unwrap().get_mut(id) { + task.status = TaskStatus::Running; + } + } + /// Update progress on a running task. pub fn update_progress(&self, id: &str, current: u64, total: u64, message: &str) { if let Some(task) = self.tasks.lock().unwrap().get_mut(id) {