use actix_session::Session; use actix_web::{HttpResponse, web}; use serde::Deserialize; use shanty_db::entities::download_queue::DownloadStatus; use shanty_db::queries; use crate::auth; use crate::config::AppConfig; use crate::error::ApiError; use crate::routes::artists::enrich_all_watched_artists; use crate::state::AppState; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("/status").route(web::get().to(get_status))) .service(web::resource("/pipeline").route(web::post().to(trigger_pipeline))) .service(web::resource("/index").route(web::post().to(trigger_index))) .service(web::resource("/tag").route(web::post().to(trigger_tag))) .service(web::resource("/organize").route(web::post().to(trigger_organize))) .service(web::resource("/tasks/{id}").route(web::get().to(get_task))) .service(web::resource("/watchlist").route(web::get().to(list_watchlist))) .service(web::resource("/watchlist/{id}").route(web::delete().to(remove_watchlist))) .service( web::resource("/config") .route(web::get().to(get_config)) .route(web::put().to(save_config)), ); } async fn get_status( state: web::Data, session: Session, ) -> Result { auth::require_auth(&session)?; let summary = shanty_watch::library_summary(state.db.conn()).await?; let pending_items = queries::downloads::list(state.db.conn(), Some(DownloadStatus::Pending)).await?; let downloading_items = queries::downloads::list(state.db.conn(), Some(DownloadStatus::Downloading)).await?; let failed_items = queries::downloads::list(state.db.conn(), Some(DownloadStatus::Failed)).await?; let tasks = state.tasks.list(); let mut queue_items = Vec::new(); queue_items.extend(downloading_items.iter().cloned()); queue_items.extend(pending_items.iter().cloned()); queue_items.extend(failed_items.iter().take(5).cloned()); let needs_tagging = queries::tracks::get_needing_metadata(state.db.conn()).await?; Ok(HttpResponse::Ok().json(serde_json::json!({ "library": summary, "queue": { "pending": pending_items.len(), "downloading": downloading_items.len(), "failed": failed_items.len(), "items": queue_items, }, "tagging": { "needs_tagging": needs_tagging.len(), "items": needs_tagging.iter().take(20).collect::>(), }, "tasks": tasks, }))) } async fn trigger_index( state: web::Data, session: Session, ) -> Result { auth::require_auth(&session)?; let task_id = state.tasks.register("index"); let state = state.clone(); let tid = task_id.clone(); tokio::spawn(async move { let cfg = state.config.read().await.clone(); state .tasks .update_progress(&tid, 0, 0, "Scanning library..."); let scan_config = shanty_index::ScanConfig { root: cfg.library_path.clone(), dry_run: false, concurrency: cfg.indexing.concurrency, }; match shanty_index::run_scan(state.db.conn(), &scan_config).await { Ok(stats) => state.tasks.complete(&tid, format!("{stats}")), Err(e) => state.tasks.fail(&tid, e.to_string()), } }); Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) } async fn trigger_tag( state: web::Data, session: Session, ) -> Result { auth::require_auth(&session)?; let task_id = state.tasks.register("tag"); let state = state.clone(); let tid = task_id.clone(); tokio::spawn(async move { let cfg = state.config.read().await.clone(); state .tasks .update_progress(&tid, 0, 0, "Preparing tagger..."); let mb = match shanty_tag::MusicBrainzClient::new() { Ok(c) => c, Err(e) => { state.tasks.fail(&tid, e.to_string()); return; } }; let tag_config = shanty_tag::TagConfig { dry_run: false, write_tags: cfg.tagging.write_tags, confidence: cfg.tagging.confidence, }; state.tasks.update_progress(&tid, 0, 0, "Tagging tracks..."); match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { Ok(stats) => state.tasks.complete(&tid, format!("{stats}")), Err(e) => state.tasks.fail(&tid, e.to_string()), } }); Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) } async fn trigger_organize( state: web::Data, session: Session, ) -> Result { auth::require_auth(&session)?; let task_id = state.tasks.register("organize"); let state = state.clone(); let tid = task_id.clone(); tokio::spawn(async move { let cfg = state.config.read().await.clone(); state .tasks .update_progress(&tid, 0, 0, "Organizing files..."); let org_config = shanty_org::OrgConfig { target_dir: cfg.library_path.clone(), format: cfg.organization_format.clone(), dry_run: false, copy: false, }; match shanty_org::organize_from_db(state.db.conn(), &org_config).await { Ok(stats) => { let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) .await .unwrap_or(0); let msg = if promoted > 0 { format!("{stats} — {promoted} items marked as owned") } else { format!("{stats}") }; state.tasks.complete(&tid, msg); let _ = enrich_all_watched_artists(&state).await; } Err(e) => state.tasks.fail(&tid, e.to_string()), } }); Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) } async fn trigger_pipeline( state: web::Data, session: Session, ) -> Result { auth::require_auth(&session)?; let sync_id = state.tasks.register_pending("sync"); let download_id = state.tasks.register_pending("download"); let index_id = state.tasks.register_pending("index"); let tag_id = state.tasks.register_pending("tag"); let organize_id = state.tasks.register_pending("organize"); let enrich_id = state.tasks.register_pending("enrich"); let task_ids = vec![ sync_id.clone(), download_id.clone(), index_id.clone(), tag_id.clone(), organize_id.clone(), enrich_id.clone(), ]; let state = state.clone(); tokio::spawn(async move { let cfg = state.config.read().await.clone(); // Step 1: Sync state.tasks.start(&sync_id); state .tasks .update_progress(&sync_id, 0, 0, "Syncing watchlist to download queue..."); match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await { Ok(stats) => state.tasks.complete(&sync_id, format!("{stats}")), Err(e) => state.tasks.fail(&sync_id, e.to_string()), } // Step 2: Download state.tasks.start(&download_id); let cookies = cfg.download.cookies_path.clone(); let format: shanty_dl::AudioFormat = cfg .download .format .parse() .unwrap_or(shanty_dl::AudioFormat::Opus); let source: shanty_dl::SearchSource = cfg .download .search_source .parse() .unwrap_or(shanty_dl::SearchSource::YouTubeMusic); let rate = if cookies.is_some() { cfg.download.rate_limit_auth } else { cfg.download.rate_limit }; let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone()); let backend_config = shanty_dl::BackendConfig { output_dir: cfg.download_path.clone(), format, cookies_path: cookies, }; let task_state = state.clone(); let progress_tid = download_id.clone(); let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| { task_state .tasks .update_progress(&progress_tid, current, total, msg); }); match shanty_dl::run_queue_with_progress( state.db.conn(), &backend, &backend_config, false, Some(on_progress), ) .await { Ok(stats) => { let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; state.tasks.complete(&download_id, format!("{stats}")); } Err(e) => state.tasks.fail(&download_id, e.to_string()), } // Step 3: Index state.tasks.start(&index_id); state .tasks .update_progress(&index_id, 0, 0, "Scanning library..."); let scan_config = shanty_index::ScanConfig { root: cfg.library_path.clone(), dry_run: false, concurrency: cfg.indexing.concurrency, }; match shanty_index::run_scan(state.db.conn(), &scan_config).await { Ok(stats) => state.tasks.complete(&index_id, format!("{stats}")), Err(e) => state.tasks.fail(&index_id, e.to_string()), } // Step 4: Tag state.tasks.start(&tag_id); state .tasks .update_progress(&tag_id, 0, 0, "Tagging tracks..."); match shanty_tag::MusicBrainzClient::new() { Ok(mb) => { let tag_config = shanty_tag::TagConfig { dry_run: false, write_tags: cfg.tagging.write_tags, confidence: cfg.tagging.confidence, }; match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { Ok(stats) => state.tasks.complete(&tag_id, format!("{stats}")), Err(e) => state.tasks.fail(&tag_id, e.to_string()), } } Err(e) => state.tasks.fail(&tag_id, e.to_string()), } // Step 5: Organize state.tasks.start(&organize_id); state .tasks .update_progress(&organize_id, 0, 0, "Organizing files..."); let org_config = shanty_org::OrgConfig { target_dir: cfg.library_path.clone(), format: cfg.organization_format.clone(), dry_run: false, copy: false, }; match shanty_org::organize_from_db(state.db.conn(), &org_config).await { Ok(stats) => { let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) .await .unwrap_or(0); let msg = if promoted > 0 { format!("{stats} — {promoted} items marked as owned") } else { format!("{stats}") }; state.tasks.complete(&organize_id, msg); } Err(e) => state.tasks.fail(&organize_id, e.to_string()), } // Step 6: Enrich state.tasks.start(&enrich_id); state .tasks .update_progress(&enrich_id, 0, 0, "Refreshing artist data..."); match enrich_all_watched_artists(&state).await { Ok(count) => state .tasks .complete(&enrich_id, format!("{count} artists refreshed")), Err(e) => state.tasks.fail(&enrich_id, e.to_string()), } }); Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids }))) } async fn get_task( state: web::Data, session: Session, path: web::Path, ) -> Result { auth::require_auth(&session)?; let id = path.into_inner(); match state.tasks.get(&id) { Some(task) => Ok(HttpResponse::Ok().json(task)), None => Err(ApiError::NotFound(format!("task {id}"))), } } async fn list_watchlist( state: web::Data, session: Session, ) -> Result { let (user_id, _, _) = auth::require_auth(&session)?; let items = shanty_watch::list_items(state.db.conn(), None, None, Some(user_id)).await?; Ok(HttpResponse::Ok().json(items)) } async fn remove_watchlist( state: web::Data, session: Session, path: web::Path, ) -> Result { auth::require_auth(&session)?; let id = path.into_inner(); shanty_watch::remove_item(state.db.conn(), id).await?; Ok(HttpResponse::NoContent().finish()) } async fn get_config( state: web::Data, session: Session, ) -> Result { auth::require_auth(&session)?; let config = state.config.read().await; Ok(HttpResponse::Ok().json(&*config)) } #[derive(Deserialize)] struct SaveConfigRequest { #[serde(flatten)] config: AppConfig, } async fn save_config( state: web::Data, session: Session, body: web::Json, ) -> Result { auth::require_admin(&session)?; let new_config = body.into_inner().config; // Persist to YAML new_config .save(state.config_path.as_deref()) .map_err(ApiError::Internal)?; // Update in-memory config let mut config = state.config.write().await; *config = new_config.clone(); tracing::info!("config updated via API"); Ok(HttpResponse::Ok().json(&new_config)) }