diff --git a/frontend/Cargo.lock b/frontend/Cargo.lock index 662e5a9..3072ee2 100644 --- a/frontend/Cargo.lock +++ b/frontend/Cargo.lock @@ -862,6 +862,7 @@ version = "0.1.0" dependencies = [ "gloo-net 0.6.0", "gloo-timers 0.3.0", + "js-sys", "serde", "serde_json", "wasm-bindgen", diff --git a/frontend/Cargo.toml b/frontend/Cargo.toml index 4b23e06..662fcd7 100644 --- a/frontend/Cargo.toml +++ b/frontend/Cargo.toml @@ -15,3 +15,4 @@ serde_json = "1" wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" web-sys = { version = "0.3", features = ["HtmlInputElement", "HtmlSelectElement", "Window"] } +js-sys = "0.3" diff --git a/frontend/src/api.rs b/frontend/src/api.rs index 7f49d76..33dc94c 100644 --- a/frontend/src/api.rs +++ b/frontend/src/api.rs @@ -85,7 +85,6 @@ pub async fn get_me() -> Result { get_json(&format!("{BASE}/auth/me")).await } - // --- Lyrics --- pub async fn get_lyrics(artist: &str, title: &str) -> Result { get_json(&format!("{BASE}/lyrics?artist={artist}&title={title}")).await @@ -142,7 +141,6 @@ pub async fn get_album(mbid: &str) -> Result { get_json(&format!("{BASE}/albums/{mbid}")).await } - // --- Watchlist --- pub async fn add_artist(name: &str, mbid: Option<&str>) -> Result { let body = match mbid { @@ -164,7 +162,6 @@ pub async fn add_album( post_json(&format!("{BASE}/albums"), &body).await } - // --- Downloads --- pub async fn get_downloads(status: Option<&str>) -> Result, ApiError> { let mut url = format!("{BASE}/downloads/queue"); @@ -210,6 +207,26 @@ pub async fn trigger_pipeline() -> Result { post_empty(&format!("{BASE}/pipeline")).await } +// --- Monitor --- +pub async fn set_artist_monitored(id: i32, monitored: bool) -> Result { + if monitored { + post_empty(&format!("{BASE}/artists/{id}/monitor")).await + } else { + let resp = Request::delete(&format!("{BASE}/artists/{id}/monitor")) + .send() + .await + .map_err(|e| ApiError(e.to_string()))?; + if !resp.ok() { + return Err(ApiError(format!("HTTP {}", resp.status()))); + } + resp.json().await.map_err(|e| ApiError(e.to_string())) + } +} + +pub async fn trigger_monitor_check() -> Result { + post_empty(&format!("{BASE}/monitor/check")).await +} + // --- System --- pub async fn trigger_index() -> Result { post_empty(&format!("{BASE}/index")).await @@ -223,7 +240,6 @@ pub async fn trigger_organize() -> Result { post_empty(&format!("{BASE}/organize")).await } - pub async fn get_config() -> Result { get_json(&format!("{BASE}/config")).await } diff --git a/frontend/src/pages/artist.rs b/frontend/src/pages/artist.rs index ef44bcf..f921258 100644 --- a/frontend/src/pages/artist.rs +++ b/frontend/src/pages/artist.rs @@ -145,6 +145,52 @@ pub fn artist_page(props: &Props) -> Html { } }; + let monitor_btn = { + // Only show monitor toggle for artists that have a local DB ID (> 0) + let artist_id_num = d.artist.id; + let is_monitored = d.monitored; + if artist_id_num > 0 { + let message = message.clone(); + let error = error.clone(); + let fetch = fetch.clone(); + let artist_id = id.clone(); + let label = if is_monitored { "Unmonitor" } else { "Monitor" }; + let btn_class = if is_monitored { + "btn btn-sm btn-secondary" + } else { + "btn btn-sm btn-primary" + }; + html! { + + } + } else { + html! {} + } + }; + html! {
if let Some(ref banner) = d.artist_banner { @@ -158,7 +204,12 @@ pub fn artist_page(props: &Props) -> Html { }
-

{ &d.artist.name }

+

+ { &d.artist.name } + if d.monitored { + { "Monitored" } + } +

if let Some(ref info) = d.artist_info {
if let Some(ref country) = info.country { @@ -195,7 +246,10 @@ pub fn artist_page(props: &Props) -> Html { }
- { watch_all_btn } +
+ { watch_all_btn } + { monitor_btn } +
if let Some(ref bio) = d.artist_bio {

{ bio }

diff --git a/frontend/src/pages/dashboard.rs b/frontend/src/pages/dashboard.rs index a561990..31d21d6 100644 --- a/frontend/src/pages/dashboard.rs +++ b/frontend/src/pages/dashboard.rs @@ -5,6 +5,30 @@ use crate::api; use crate::components::status_badge::StatusBadge; use crate::types::Status; +/// Format a UTC datetime string as a relative time like "in 2h 15m". +fn format_next_run(datetime_str: &str) -> String { + let now_ms = js_sys::Date::new_0().get_time(); + let parsed = js_sys::Date::parse(datetime_str); + + // NaN means parse failed + if parsed.is_nan() { + return datetime_str.to_string(); + } + + let delta_ms = parsed - now_ms; + if delta_ms <= 0.0 { + return "soon".to_string(); + } + let total_mins = (delta_ms / 60_000.0) as u64; + let hours = total_mins / 60; + let mins = total_mins % 60; + if hours > 0 { + format!("in {hours}h {mins}m") + } else { + format!("in {mins}m") + } +} + #[function_component(DashboardPage)] pub fn dashboard() -> Html { let status = use_state(|| None::); @@ -157,6 +181,29 @@ pub fn dashboard() -> Html { }) }; + let on_monitor_check = { + let message = message.clone(); + let error = error.clone(); + let fetch = fetch_status.clone(); + Callback::from(move |_: MouseEvent| { + let message = message.clone(); + let error = error.clone(); + let fetch = fetch.clone(); + wasm_bindgen_futures::spawn_local(async move { + match api::trigger_monitor_check().await { + Ok(t) => { + message.set(Some(format!( + "Monitor check started (task: {})", + &t.task_id[..8] + ))); + fetch.emit(()); + } + Err(e) => error.set(Some(e.0)), + } + }); + }) + }; + let on_pipeline = { let message = message.clone(); let error = error.clone(); @@ -193,6 +240,35 @@ pub fn dashboard() -> Html { .iter() .any(|t| t.status == "Pending" || t.status == "Running"); + // Pre-compute scheduled task rows + let scheduled_rows = { + let mut rows = Vec::new(); + if let Some(ref sched) = s.scheduled { + if let Some(ref next) = sched.next_pipeline { + rows.push(html! { + + { "Auto Pipeline" } + { "Scheduled" } + + { format!("Next run: {}", format_next_run(next)) } + + }); + } + if let Some(ref next) = sched.next_monitor { + rows.push(html! { + + { "Monitor Check" } + { "Scheduled" } + + { format!("Next run: {}", format_next_run(next)) } + + }); + } + } + rows + }; + let has_scheduled = !scheduled_rows.is_empty(); + html! {
- // Background Tasks - if !s.tasks.is_empty() { + // Background Tasks (always show if there are tasks or scheduled items) + if !s.tasks.is_empty() || has_scheduled {

{ "Background Tasks" }

@@ -259,6 +336,7 @@ pub fn dashboard() -> Html { + { for scheduled_rows.into_iter() } { for s.tasks.iter().map(|t| { let progress_html = if let Some(ref p) = t.progress { if p.total > 0 { diff --git a/frontend/src/pages/library.rs b/frontend/src/pages/library.rs index 598d420..a6ed1c2 100644 --- a/frontend/src/pages/library.rs +++ b/frontend/src/pages/library.rs @@ -45,6 +45,7 @@ pub fn library_page() -> Html { + @@ -58,6 +59,11 @@ pub fn library_page() -> Html { { &a.name } > +
{ "Type" }{ "Status" }{ "Progress" }{ "Result" }
{ "Name" }{ "Monitored" } { "Owned" } { "Watched" } { "Tracks" } + if a.monitored { + { "\u{2713}" } + } + if a.total_items > 0 { Html { wasm_bindgen_futures::spawn_local(async move { match api::ytauth_login_stop().await { Ok(_) => { - message.set(Some("YouTube login complete! Cookies exported.".into())); + message.set(Some( + "YouTube login complete! Cookies exported.".into(), + )); if let Ok(s) = api::get_ytauth_status().await { ytauth.set(Some(s)); } @@ -123,7 +125,8 @@ pub fn settings_page() -> Html { } } else if status.authenticated { - let age_text = status.cookie_age_hours + let age_text = status + .cookie_age_hours .map(|h| format!("cookies {h:.0}h old")) .unwrap_or_else(|| "authenticated".into()); let on_refresh = { @@ -240,7 +243,10 @@ pub fn settings_page() -> Html { }; let ytdlp_version_html = if let Some(ref status) = *ytauth { - let version = status.ytdlp_version.clone().unwrap_or_else(|| "not found".into()); + let version = status + .ytdlp_version + .clone() + .unwrap_or_else(|| "not found".into()); if status.ytdlp_update_available { let latest = status.ytdlp_latest.clone().unwrap_or_default(); html! { @@ -266,7 +272,10 @@ pub fn settings_page() -> Html { }; let lastfm_key_html = { - let key_set = ytauth.as_ref().map(|s| s.lastfm_api_key_set).unwrap_or(false); + let key_set = ytauth + .as_ref() + .map(|s| s.lastfm_api_key_set) + .unwrap_or(false); if key_set { html! {

@@ -567,6 +576,60 @@ pub fn settings_page() -> Html { + // Scheduling +

+

{ "Scheduling" }

+

{ "Automate pipeline runs and new release monitoring" }

+
+ +
+
+ + +
+
+ +
+
+ + +
+
+ // Indexing

{ "Indexing" }

diff --git a/frontend/src/types.rs b/frontend/src/types.rs index d2d8144..70fae86 100644 --- a/frontend/src/types.rs +++ b/frontend/src/types.rs @@ -28,6 +28,8 @@ pub struct ArtistListItem { pub id: i32, pub name: String, pub musicbrainz_id: Option, + #[serde(default)] + pub monitored: bool, pub total_watched: usize, pub total_owned: usize, pub total_items: usize, @@ -62,6 +64,8 @@ pub struct FullArtistDetail { #[serde(default)] pub enriched: bool, #[serde(default)] + pub monitored: bool, + #[serde(default)] pub artist_info: Option, #[serde(default)] pub artist_photo: Option, @@ -240,6 +244,14 @@ pub struct Status { #[serde(default)] pub tagging: Option, pub tasks: Vec, + #[serde(default)] + pub scheduled: Option, +} + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ScheduledTasks { + pub next_pipeline: Option, + pub next_monitor: Option, } #[derive(Debug, Clone, PartialEq, Deserialize)] @@ -297,6 +309,8 @@ pub struct AppConfig { pub indexing: IndexingConfigFe, #[serde(default)] pub metadata: MetadataConfigFe, + #[serde(default)] + pub scheduling: SchedulingConfigFe, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] @@ -376,6 +390,25 @@ impl Default for MetadataConfigFe { } } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +pub struct SchedulingConfigFe { + #[serde(default)] + pub pipeline_enabled: bool, + #[serde(default = "default_pipeline_interval_hours")] + pub pipeline_interval_hours: u32, + #[serde(default)] + pub monitor_enabled: bool, + #[serde(default = "default_monitor_interval_hours")] + pub monitor_interval_hours: u32, +} + +fn default_pipeline_interval_hours() -> u32 { + 3 +} +fn default_monitor_interval_hours() -> u32 { + 12 +} + fn default_metadata_source() -> String { "musicbrainz".into() } diff --git a/src/lib.rs b/src/lib.rs index 58f9295..a2faa38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,9 @@ pub mod auth; pub mod config; pub mod cookie_refresh; pub mod error; +pub mod monitor; +pub mod pipeline; +pub mod pipeline_scheduler; pub mod routes; pub mod state; pub mod tasks; diff --git a/src/main.rs b/src/main.rs index 645f93f..4b945da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,11 +71,19 @@ async fn main() -> anyhow::Result<()> { config_path, tasks: TaskManager::new(), firefox_login: tokio::sync::Mutex::new(None), + scheduler: tokio::sync::Mutex::new(shanty_web::state::SchedulerInfo { + next_pipeline: None, + next_monitor: None, + }), }); // Start background cookie refresh task shanty_web::cookie_refresh::spawn(state.config.clone()); + // Start pipeline and monitor schedulers + shanty_web::pipeline_scheduler::spawn(state.clone()); + shanty_web::monitor::spawn(state.clone()); + // Resolve static files directory relative to the binary location let static_dir = std::env::current_exe() .ok() diff --git a/src/monitor.rs b/src/monitor.rs new file mode 100644 index 0000000..b738cf6 --- /dev/null +++ b/src/monitor.rs @@ -0,0 +1,293 @@ +//! Artist monitoring — periodically checks for new releases from monitored artists +//! and automatically adds them to the watchlist. + +use std::collections::HashSet; +use std::time::Duration; + +use actix_web::web; +use serde::Serialize; +use shanty_data::MetadataFetcher; +use shanty_db::queries; +use shanty_search::SearchProvider; + +use crate::error::ApiError; +use crate::state::AppState; + +/// Stats returned from a monitor check run. +#[derive(Debug, Clone, Serialize)] +pub struct MonitorStats { + pub artists_checked: u32, + pub new_releases_found: u32, + pub tracks_added: u32, +} + +/// Check all monitored artists for new releases and add them to the watchlist. +pub async fn check_monitored_artists( + state: &web::Data, +) -> Result { + let monitored = queries::artists::list_monitored(state.db.conn()).await?; + + let mut stats = MonitorStats { + artists_checked: 0, + new_releases_found: 0, + tracks_added: 0, + }; + + let allowed = state.config.read().await.allowed_secondary_types.clone(); + + for artist in &monitored { + let mbid = match &artist.musicbrainz_id { + Some(m) => m.clone(), + None => { + tracing::warn!( + artist_id = artist.id, + name = %artist.name, + "monitored artist has no MBID, skipping" + ); + continue; + } + }; + + stats.artists_checked += 1; + + // Fetch release groups from MusicBrainz + let all_release_groups = match state.search.get_release_groups(&mbid).await { + Ok(rgs) => rgs, + Err(e) => { + tracing::warn!( + artist = %artist.name, + error = %e, + "failed to fetch release groups for monitored artist" + ); + continue; + } + }; + + // Filter by allowed secondary types (same logic as enrich_artist) + let release_groups: Vec<_> = all_release_groups + .into_iter() + .filter(|rg| { + if rg.secondary_types.is_empty() { + true + } else { + rg.secondary_types.iter().all(|st| allowed.contains(st)) + } + }) + .collect(); + + // Get all existing wanted items for this artist + let artist_wanted = queries::wanted::list(state.db.conn(), None, None).await?; + let wanted_mbids: HashSet = artist_wanted + .iter() + .filter(|w| w.artist_id == Some(artist.id)) + .filter_map(|w| w.musicbrainz_id.clone()) + .collect(); + + // Check each release group's tracks to find new ones + for rg in &release_groups { + // Check if we already have any tracks from this release group cached + let cache_key = format!("artist_rg_tracks:{}", rg.id); + let cached_tracks: Option> = + if let Ok(Some(json)) = queries::cache::get(state.db.conn(), &cache_key).await { + // Parse the cached data to get recording MBIDs + if let Ok(cached) = serde_json::from_str::(&json) { + cached + .get("tracks") + .and_then(|t| t.as_array()) + .map(|tracks| { + tracks + .iter() + .filter_map(|t| { + t.get("recording_mbid") + .and_then(|m| m.as_str()) + .map(String::from) + }) + .collect() + }) + } else { + None + } + } else { + None + }; + + let track_mbids = if let Some(mbids) = cached_tracks { + mbids + } else { + // Not cached — resolve release and fetch tracks + // Rate limit: sleep 1.1s between MB requests + tokio::time::sleep(Duration::from_millis(1100)).await; + + let release_mbid = if let Some(ref rid) = rg.first_release_id { + rid.clone() + } else { + // Need to resolve from release group + match resolve_release_from_group(&rg.id).await { + Ok(rid) => rid, + Err(e) => { + tracing::debug!(rg_id = %rg.id, error = %e, "skipping release group"); + continue; + } + } + }; + + tokio::time::sleep(Duration::from_millis(1100)).await; + + match state.mb_client.get_release_tracks(&release_mbid).await { + Ok(tracks) => tracks.into_iter().map(|t| t.recording_mbid).collect(), + Err(e) => { + tracing::debug!( + release = %release_mbid, + error = %e, + "failed to fetch tracks" + ); + continue; + } + } + }; + + // Check if any of these tracks are NOT in the wanted items + let new_mbids: Vec<&String> = track_mbids + .iter() + .filter(|mbid| !wanted_mbids.contains(*mbid)) + .collect(); + + if new_mbids.is_empty() { + continue; + } + + // Found a release group with new tracks — add the whole album via shanty_watch + tracing::info!( + artist = %artist.name, + album = %rg.title, + new_tracks = new_mbids.len(), + "new release detected for monitored artist" + ); + + stats.new_releases_found += 1; + + match shanty_watch::add_album( + state.db.conn(), + Some(&artist.name), + Some(&rg.title), + rg.first_release_id.as_deref(), + &state.mb_client, + None, // system-initiated, no user_id + ) + .await + { + Ok(summary) => { + stats.tracks_added += summary.tracks_added as u32; + } + Err(e) => { + tracing::warn!( + artist = %artist.name, + album = %rg.title, + error = %e, + "failed to add album for monitored artist" + ); + } + } + } + + // Update last_checked_at + if let Err(e) = queries::artists::update_last_checked(state.db.conn(), artist.id).await { + tracing::warn!( + artist_id = artist.id, + error = %e, + "failed to update last_checked_at" + ); + } + } + + Ok(stats) +} + +/// Given a release-group MBID, find the first release MBID. +async fn resolve_release_from_group(release_group_mbid: &str) -> Result { + let client = reqwest::Client::builder() + .user_agent("Shanty/0.1.0 (shanty-music-app)") + .build() + .map_err(|e| ApiError::Internal(e.to_string()))?; + + let url = format!( + "https://musicbrainz.org/ws/2/release?release-group={release_group_mbid}&fmt=json&limit=1" + ); + + let resp: serde_json::Value = client + .get(&url) + .send() + .await + .map_err(|e| ApiError::Internal(e.to_string()))? + .json() + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + resp.get("releases") + .and_then(|r| r.as_array()) + .and_then(|arr| arr.first()) + .and_then(|r| r.get("id")) + .and_then(|id| id.as_str()) + .map(String::from) + .ok_or_else(|| ApiError::NotFound(format!("no releases for group {release_group_mbid}"))) +} + +/// Spawn the monitor scheduler background loop. +/// +/// Sleeps for the configured interval, then checks monitored artists if enabled. +/// Reads config each iteration so changes take effect without restart. +pub fn spawn(state: web::Data) { + tokio::spawn(async move { + loop { + let (enabled, hours) = { + let cfg = state.config.read().await; + ( + cfg.scheduling.monitor_enabled, + cfg.scheduling.monitor_interval_hours.max(1), + ) + }; + + let sleep_secs = u64::from(hours) * 3600; + + // Update next-run time + { + let mut sched = state.scheduler.lock().await; + sched.next_monitor = if enabled { + Some( + (chrono::Utc::now() + chrono::Duration::seconds(sleep_secs as i64)) + .naive_utc(), + ) + } else { + None + }; + } + + tokio::time::sleep(Duration::from_secs(sleep_secs)).await; + + if !enabled { + continue; + } + + // Clear next-run while running + { + let mut sched = state.scheduler.lock().await; + sched.next_monitor = None; + } + + tracing::info!("scheduled monitor check starting"); + match check_monitored_artists(&state).await { + Ok(stats) => { + tracing::info!( + artists_checked = stats.artists_checked, + new_releases = stats.new_releases_found, + tracks_added = stats.tracks_added, + "scheduled monitor check complete" + ); + } + Err(e) => { + tracing::error!(error = %e, "scheduled monitor check failed"); + } + } + } + }); +} diff --git a/src/pipeline.rs b/src/pipeline.rs new file mode 100644 index 0000000..bd45e62 --- /dev/null +++ b/src/pipeline.rs @@ -0,0 +1,218 @@ +//! Shared pipeline logic used by both the API endpoint and the scheduler. + +use actix_web::web; +use shanty_db::queries; + +use crate::routes::artists::enrich_all_watched_artists; +use crate::state::AppState; + +/// Register and spawn the full 6-step pipeline. Returns the task IDs immediately. +/// +/// Steps: sync, download, index, tag, organize, enrich. +pub fn spawn_pipeline(state: &web::Data) -> Vec { + let sync_id = state.tasks.register_pending("sync"); + let download_id = state.tasks.register_pending("download"); + let index_id = state.tasks.register_pending("index"); + let tag_id = state.tasks.register_pending("tag"); + let organize_id = state.tasks.register_pending("organize"); + let enrich_id = state.tasks.register_pending("enrich"); + + let task_ids = vec![ + sync_id.clone(), + download_id.clone(), + index_id.clone(), + tag_id.clone(), + organize_id.clone(), + enrich_id.clone(), + ]; + + let state = state.clone(); + tokio::spawn(async move { + run_pipeline_inner( + &state, + &sync_id, + &download_id, + &index_id, + &tag_id, + &organize_id, + &enrich_id, + ) + .await; + }); + + task_ids +} + +/// Run the pipeline without registering tasks (for the scheduler, which logs instead). +pub async fn run_pipeline(state: &web::Data) -> Vec { + let sync_id = state.tasks.register_pending("sync"); + let download_id = state.tasks.register_pending("download"); + let index_id = state.tasks.register_pending("index"); + let tag_id = state.tasks.register_pending("tag"); + let organize_id = state.tasks.register_pending("organize"); + let enrich_id = state.tasks.register_pending("enrich"); + + let task_ids = vec![ + sync_id.clone(), + download_id.clone(), + index_id.clone(), + tag_id.clone(), + organize_id.clone(), + enrich_id.clone(), + ]; + + run_pipeline_inner( + state, + &sync_id, + &download_id, + &index_id, + &tag_id, + &organize_id, + &enrich_id, + ) + .await; + + task_ids +} + +async fn run_pipeline_inner( + state: &web::Data, + sync_id: &str, + download_id: &str, + index_id: &str, + tag_id: &str, + organize_id: &str, + enrich_id: &str, +) { + let cfg = state.config.read().await.clone(); + + // Step 1: Sync + state.tasks.start(sync_id); + state + .tasks + .update_progress(sync_id, 0, 0, "Syncing watchlist to download queue..."); + match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await { + Ok(stats) => state.tasks.complete(sync_id, format!("{stats}")), + Err(e) => state.tasks.fail(sync_id, e.to_string()), + } + + // Step 2: Download + state.tasks.start(download_id); + let cookies = cfg.download.cookies_path.clone(); + let format: shanty_dl::AudioFormat = cfg + .download + .format + .parse() + .unwrap_or(shanty_dl::AudioFormat::Opus); + let source: shanty_dl::SearchSource = cfg + .download + .search_source + .parse() + .unwrap_or(shanty_dl::SearchSource::YouTubeMusic); + let rate = if cookies.is_some() { + cfg.download.rate_limit_auth + } else { + cfg.download.rate_limit + }; + let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone()); + let backend_config = shanty_dl::BackendConfig { + output_dir: cfg.download_path.clone(), + format, + cookies_path: cookies, + }; + let task_state = state.clone(); + let progress_tid = download_id.to_string(); + let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| { + task_state + .tasks + .update_progress(&progress_tid, current, total, msg); + }); + match shanty_dl::run_queue_with_progress( + state.db.conn(), + &backend, + &backend_config, + false, + Some(on_progress), + ) + .await + { + Ok(stats) => { + let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; + state.tasks.complete(download_id, format!("{stats}")); + } + Err(e) => state.tasks.fail(download_id, e.to_string()), + } + + // Step 3: Index + state.tasks.start(index_id); + state + .tasks + .update_progress(index_id, 0, 0, "Scanning library..."); + let scan_config = shanty_index::ScanConfig { + root: cfg.library_path.clone(), + dry_run: false, + concurrency: cfg.indexing.concurrency, + }; + match shanty_index::run_scan(state.db.conn(), &scan_config).await { + Ok(stats) => state.tasks.complete(index_id, format!("{stats}")), + Err(e) => state.tasks.fail(index_id, e.to_string()), + } + + // Step 4: Tag + state.tasks.start(tag_id); + state + .tasks + .update_progress(tag_id, 0, 0, "Tagging tracks..."); + match shanty_tag::MusicBrainzClient::new() { + Ok(mb) => { + let tag_config = shanty_tag::TagConfig { + dry_run: false, + write_tags: cfg.tagging.write_tags, + confidence: cfg.tagging.confidence, + }; + match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { + Ok(stats) => state.tasks.complete(tag_id, format!("{stats}")), + Err(e) => state.tasks.fail(tag_id, e.to_string()), + } + } + Err(e) => state.tasks.fail(tag_id, e.to_string()), + } + + // Step 5: Organize + state.tasks.start(organize_id); + state + .tasks + .update_progress(organize_id, 0, 0, "Organizing files..."); + let org_config = shanty_org::OrgConfig { + target_dir: cfg.library_path.clone(), + format: cfg.organization_format.clone(), + dry_run: false, + copy: false, + }; + match shanty_org::organize_from_db(state.db.conn(), &org_config).await { + Ok(stats) => { + let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) + .await + .unwrap_or(0); + let msg = if promoted > 0 { + format!("{stats} — {promoted} items marked as owned") + } else { + format!("{stats}") + }; + state.tasks.complete(organize_id, msg); + } + Err(e) => state.tasks.fail(organize_id, e.to_string()), + } + + // Step 6: Enrich + state.tasks.start(enrich_id); + state + .tasks + .update_progress(enrich_id, 0, 0, "Refreshing artist data..."); + match enrich_all_watched_artists(state).await { + Ok(count) => state + .tasks + .complete(enrich_id, format!("{count} artists refreshed")), + Err(e) => state.tasks.fail(enrich_id, e.to_string()), + } +} diff --git a/src/pipeline_scheduler.rs b/src/pipeline_scheduler.rs new file mode 100644 index 0000000..52a5310 --- /dev/null +++ b/src/pipeline_scheduler.rs @@ -0,0 +1,54 @@ +//! Background task that runs the full pipeline on a configurable interval. + +use std::time::Duration; + +use actix_web::web; +use chrono::Utc; + +use crate::state::AppState; + +/// Spawn the pipeline scheduler background loop. +/// +/// Sleeps for the configured interval, then runs the full pipeline if enabled. +/// Reads config each iteration so changes take effect without restart. +pub fn spawn(state: web::Data) { + tokio::spawn(async move { + loop { + let (enabled, hours) = { + let cfg = state.config.read().await; + ( + cfg.scheduling.pipeline_enabled, + cfg.scheduling.pipeline_interval_hours.max(1), + ) + }; + + let sleep_secs = u64::from(hours) * 3600; + + // Update next-run time + { + let mut sched = state.scheduler.lock().await; + sched.next_pipeline = if enabled { + Some((Utc::now() + chrono::Duration::seconds(sleep_secs as i64)).naive_utc()) + } else { + None + }; + } + + tokio::time::sleep(Duration::from_secs(sleep_secs)).await; + + if !enabled { + continue; + } + + // Clear next-run while running + { + let mut sched = state.scheduler.lock().await; + sched.next_pipeline = None; + } + + tracing::info!("scheduled pipeline starting"); + let task_ids = crate::pipeline::run_pipeline(&state).await; + tracing::info!(?task_ids, "scheduled pipeline complete"); + } + }); +} diff --git a/src/routes/artists.rs b/src/routes/artists.rs index 83e0ac0..2c0609b 100644 --- a/src/routes/artists.rs +++ b/src/routes/artists.rs @@ -33,6 +33,7 @@ struct ArtistListItem { id: i32, name: String, musicbrainz_id: Option, + monitored: bool, total_watched: usize, total_owned: usize, total_items: usize, @@ -72,6 +73,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .route(web::post().to(add_artist)), ) .service(web::resource("/artists/{id}/full").route(web::get().to(get_artist_full))) + .service( + web::resource("/artists/{id}/monitor") + .route(web::post().to(set_monitored)) + .route(web::delete().to(unset_monitored)), + ) .service( web::resource("/artists/{id}") .route(web::get().to(get_artist)) @@ -121,6 +127,7 @@ async fn list_artists( id: a.id, name: a.name.clone(), musicbrainz_id: a.musicbrainz_id.clone(), + monitored: a.monitored, total_watched, total_owned, total_items, @@ -324,6 +331,8 @@ pub async fn enrich_artist( added_at: chrono::Utc::now().naive_utc(), top_songs: "[]".to_string(), similar_artists: "[]".to_string(), + monitored: false, + last_checked_at: None, }; (synthetic, None, mbid) } @@ -603,6 +612,7 @@ pub async fn enrich_artist( "total_watched_tracks": total_artist_watched, "total_owned_tracks": total_artist_owned, "enriched": !skip_track_fetch, + "monitored": artist.monitored, "artist_info": artist_info, "artist_photo": artist_photo, "artist_bio": artist_bio, @@ -800,3 +810,33 @@ async fn delete_artist( queries::artists::delete(state.db.conn(), id).await?; Ok(HttpResponse::NoContent().finish()) } + +async fn set_monitored( + state: web::Data, + session: Session, + path: web::Path, +) -> Result { + auth::require_auth(&session)?; + let id = path.into_inner(); + let artist = queries::artists::set_monitored(state.db.conn(), id, true).await?; + Ok(HttpResponse::Ok().json(serde_json::json!({ + "id": artist.id, + "name": artist.name, + "monitored": artist.monitored, + }))) +} + +async fn unset_monitored( + state: web::Data, + session: Session, + path: web::Path, +) -> Result { + auth::require_auth(&session)?; + let id = path.into_inner(); + let artist = queries::artists::set_monitored(state.db.conn(), id, false).await?; + Ok(HttpResponse::Ok().json(serde_json::json!({ + "id": artist.id, + "name": artist.name, + "monitored": artist.monitored, + }))) +} diff --git a/src/routes/system.rs b/src/routes/system.rs index 2b2e627..bc3486c 100644 --- a/src/routes/system.rs +++ b/src/routes/system.rs @@ -20,6 +20,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/tasks/{id}").route(web::get().to(get_task))) .service(web::resource("/watchlist").route(web::get().to(list_watchlist))) .service(web::resource("/watchlist/{id}").route(web::delete().to(remove_watchlist))) + .service(web::resource("/monitor/check").route(web::post().to(trigger_monitor_check))) + .service(web::resource("/monitor/status").route(web::get().to(get_monitor_status))) .service( web::resource("/config") .route(web::get().to(get_config)) @@ -48,6 +50,14 @@ async fn get_status( let needs_tagging = queries::tracks::get_needing_metadata(state.db.conn()).await?; + // Scheduled task info + let sched = state.scheduler.lock().await; + let scheduled_tasks = serde_json::json!({ + "next_pipeline": sched.next_pipeline, + "next_monitor": sched.next_monitor, + }); + drop(sched); + Ok(HttpResponse::Ok().json(serde_json::json!({ "library": summary, "queue": { @@ -61,6 +71,7 @@ async fn get_status( "items": needs_tagging.iter().take(20).collect::>(), }, "tasks": tasks, + "scheduled": scheduled_tasks, }))) } @@ -173,158 +184,7 @@ async fn trigger_pipeline( session: Session, ) -> Result { auth::require_auth(&session)?; - let sync_id = state.tasks.register_pending("sync"); - let download_id = state.tasks.register_pending("download"); - let index_id = state.tasks.register_pending("index"); - let tag_id = state.tasks.register_pending("tag"); - let organize_id = state.tasks.register_pending("organize"); - let enrich_id = state.tasks.register_pending("enrich"); - - let task_ids = vec![ - sync_id.clone(), - download_id.clone(), - index_id.clone(), - tag_id.clone(), - organize_id.clone(), - enrich_id.clone(), - ]; - - let state = state.clone(); - - tokio::spawn(async move { - let cfg = state.config.read().await.clone(); - - // Step 1: Sync - state.tasks.start(&sync_id); - state - .tasks - .update_progress(&sync_id, 0, 0, "Syncing watchlist to download queue..."); - match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await { - Ok(stats) => state.tasks.complete(&sync_id, format!("{stats}")), - Err(e) => state.tasks.fail(&sync_id, e.to_string()), - } - - // Step 2: Download - state.tasks.start(&download_id); - let cookies = cfg.download.cookies_path.clone(); - let format: shanty_dl::AudioFormat = cfg - .download - .format - .parse() - .unwrap_or(shanty_dl::AudioFormat::Opus); - let source: shanty_dl::SearchSource = cfg - .download - .search_source - .parse() - .unwrap_or(shanty_dl::SearchSource::YouTubeMusic); - let rate = if cookies.is_some() { - cfg.download.rate_limit_auth - } else { - cfg.download.rate_limit - }; - let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone()); - let backend_config = shanty_dl::BackendConfig { - output_dir: cfg.download_path.clone(), - format, - cookies_path: cookies, - }; - let task_state = state.clone(); - let progress_tid = download_id.clone(); - let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| { - task_state - .tasks - .update_progress(&progress_tid, current, total, msg); - }); - match shanty_dl::run_queue_with_progress( - state.db.conn(), - &backend, - &backend_config, - false, - Some(on_progress), - ) - .await - { - Ok(stats) => { - let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await; - state.tasks.complete(&download_id, format!("{stats}")); - } - Err(e) => state.tasks.fail(&download_id, e.to_string()), - } - - // Step 3: Index - state.tasks.start(&index_id); - state - .tasks - .update_progress(&index_id, 0, 0, "Scanning library..."); - let scan_config = shanty_index::ScanConfig { - root: cfg.library_path.clone(), - dry_run: false, - concurrency: cfg.indexing.concurrency, - }; - match shanty_index::run_scan(state.db.conn(), &scan_config).await { - Ok(stats) => state.tasks.complete(&index_id, format!("{stats}")), - Err(e) => state.tasks.fail(&index_id, e.to_string()), - } - - // Step 4: Tag - state.tasks.start(&tag_id); - state - .tasks - .update_progress(&tag_id, 0, 0, "Tagging tracks..."); - match shanty_tag::MusicBrainzClient::new() { - Ok(mb) => { - let tag_config = shanty_tag::TagConfig { - dry_run: false, - write_tags: cfg.tagging.write_tags, - confidence: cfg.tagging.confidence, - }; - match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await { - Ok(stats) => state.tasks.complete(&tag_id, format!("{stats}")), - Err(e) => state.tasks.fail(&tag_id, e.to_string()), - } - } - Err(e) => state.tasks.fail(&tag_id, e.to_string()), - } - - // Step 5: Organize - state.tasks.start(&organize_id); - state - .tasks - .update_progress(&organize_id, 0, 0, "Organizing files..."); - let org_config = shanty_org::OrgConfig { - target_dir: cfg.library_path.clone(), - format: cfg.organization_format.clone(), - dry_run: false, - copy: false, - }; - match shanty_org::organize_from_db(state.db.conn(), &org_config).await { - Ok(stats) => { - let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn()) - .await - .unwrap_or(0); - let msg = if promoted > 0 { - format!("{stats} — {promoted} items marked as owned") - } else { - format!("{stats}") - }; - state.tasks.complete(&organize_id, msg); - } - Err(e) => state.tasks.fail(&organize_id, e.to_string()), - } - - // Step 6: Enrich - state.tasks.start(&enrich_id); - state - .tasks - .update_progress(&enrich_id, 0, 0, "Refreshing artist data..."); - match enrich_all_watched_artists(&state).await { - Ok(count) => state - .tasks - .complete(&enrich_id, format!("{count} artists refreshed")), - Err(e) => state.tasks.fail(&enrich_id, e.to_string()), - } - }); - + let task_ids = crate::pipeline::spawn_pipeline(&state); Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids }))) } @@ -361,6 +221,53 @@ async fn remove_watchlist( Ok(HttpResponse::NoContent().finish()) } +async fn trigger_monitor_check( + state: web::Data, + session: Session, +) -> Result { + auth::require_admin(&session)?; + let state = state.clone(); + let task_id = state.tasks.register("monitor_check"); + let tid = task_id.clone(); + tokio::spawn(async move { + state + .tasks + .update_progress(&tid, 0, 0, "Checking monitored artists..."); + match crate::monitor::check_monitored_artists(&state).await { + Ok(stats) => state.tasks.complete( + &tid, + format!( + "{} artists checked, {} new releases, {} tracks added", + stats.artists_checked, stats.new_releases_found, stats.tracks_added + ), + ), + Err(e) => state.tasks.fail(&tid, e.to_string()), + } + }); + Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id }))) +} + +async fn get_monitor_status( + state: web::Data, + session: Session, +) -> Result { + auth::require_auth(&session)?; + let monitored = queries::artists::list_monitored(state.db.conn()).await?; + let items: Vec = monitored + .iter() + .map(|a| { + serde_json::json!({ + "id": a.id, + "name": a.name, + "musicbrainz_id": a.musicbrainz_id, + "monitored": a.monitored, + "last_checked_at": a.last_checked_at, + }) + }) + .collect(); + Ok(HttpResponse::Ok().json(items)) +} + async fn get_config( state: web::Data, session: Session, diff --git a/src/state.rs b/src/state.rs index 0310540..023aeed 100644 --- a/src/state.rs +++ b/src/state.rs @@ -14,6 +14,14 @@ pub struct FirefoxLoginSession { pub vnc_url: String, } +/// Tracks next-run times for scheduled background tasks. +pub struct SchedulerInfo { + /// When the next pipeline run is scheduled (None if disabled). + pub next_pipeline: Option, + /// When the next monitor check is scheduled (None if disabled). + pub next_monitor: Option, +} + pub struct AppState { pub db: Database, pub mb_client: MusicBrainzFetcher, @@ -23,4 +31,5 @@ pub struct AppState { pub config_path: Option, pub tasks: TaskManager, pub firefox_login: Mutex>, + pub scheduler: Mutex, }