Added the watch and scheduler systems

This commit is contained in:
Connor Johnstone
2026-03-20 16:28:15 -04:00
parent eaaff5f98f
commit 9d6c0e31c1
16 changed files with 948 additions and 164 deletions

View File

@@ -8,6 +8,9 @@ pub mod auth;
pub mod config;
pub mod cookie_refresh;
pub mod error;
pub mod monitor;
pub mod pipeline;
pub mod pipeline_scheduler;
pub mod routes;
pub mod state;
pub mod tasks;

View File

@@ -71,11 +71,19 @@ async fn main() -> anyhow::Result<()> {
config_path,
tasks: TaskManager::new(),
firefox_login: tokio::sync::Mutex::new(None),
scheduler: tokio::sync::Mutex::new(shanty_web::state::SchedulerInfo {
next_pipeline: None,
next_monitor: None,
}),
});
// Start background cookie refresh task
shanty_web::cookie_refresh::spawn(state.config.clone());
// Start pipeline and monitor schedulers
shanty_web::pipeline_scheduler::spawn(state.clone());
shanty_web::monitor::spawn(state.clone());
// Resolve static files directory relative to the binary location
let static_dir = std::env::current_exe()
.ok()

293
src/monitor.rs Normal file
View File

@@ -0,0 +1,293 @@
//! Artist monitoring — periodically checks for new releases from monitored artists
//! and automatically adds them to the watchlist.
use std::collections::HashSet;
use std::time::Duration;
use actix_web::web;
use serde::Serialize;
use shanty_data::MetadataFetcher;
use shanty_db::queries;
use shanty_search::SearchProvider;
use crate::error::ApiError;
use crate::state::AppState;
/// Stats returned from a monitor check run.
#[derive(Debug, Clone, Serialize)]
pub struct MonitorStats {
pub artists_checked: u32,
pub new_releases_found: u32,
pub tracks_added: u32,
}
/// Check all monitored artists for new releases and add them to the watchlist.
pub async fn check_monitored_artists(
state: &web::Data<AppState>,
) -> Result<MonitorStats, ApiError> {
let monitored = queries::artists::list_monitored(state.db.conn()).await?;
let mut stats = MonitorStats {
artists_checked: 0,
new_releases_found: 0,
tracks_added: 0,
};
let allowed = state.config.read().await.allowed_secondary_types.clone();
for artist in &monitored {
let mbid = match &artist.musicbrainz_id {
Some(m) => m.clone(),
None => {
tracing::warn!(
artist_id = artist.id,
name = %artist.name,
"monitored artist has no MBID, skipping"
);
continue;
}
};
stats.artists_checked += 1;
// Fetch release groups from MusicBrainz
let all_release_groups = match state.search.get_release_groups(&mbid).await {
Ok(rgs) => rgs,
Err(e) => {
tracing::warn!(
artist = %artist.name,
error = %e,
"failed to fetch release groups for monitored artist"
);
continue;
}
};
// Filter by allowed secondary types (same logic as enrich_artist)
let release_groups: Vec<_> = all_release_groups
.into_iter()
.filter(|rg| {
if rg.secondary_types.is_empty() {
true
} else {
rg.secondary_types.iter().all(|st| allowed.contains(st))
}
})
.collect();
// Get all existing wanted items for this artist
let artist_wanted = queries::wanted::list(state.db.conn(), None, None).await?;
let wanted_mbids: HashSet<String> = artist_wanted
.iter()
.filter(|w| w.artist_id == Some(artist.id))
.filter_map(|w| w.musicbrainz_id.clone())
.collect();
// Check each release group's tracks to find new ones
for rg in &release_groups {
// Check if we already have any tracks from this release group cached
let cache_key = format!("artist_rg_tracks:{}", rg.id);
let cached_tracks: Option<Vec<String>> =
if let Ok(Some(json)) = queries::cache::get(state.db.conn(), &cache_key).await {
// Parse the cached data to get recording MBIDs
if let Ok(cached) = serde_json::from_str::<serde_json::Value>(&json) {
cached
.get("tracks")
.and_then(|t| t.as_array())
.map(|tracks| {
tracks
.iter()
.filter_map(|t| {
t.get("recording_mbid")
.and_then(|m| m.as_str())
.map(String::from)
})
.collect()
})
} else {
None
}
} else {
None
};
let track_mbids = if let Some(mbids) = cached_tracks {
mbids
} else {
// Not cached — resolve release and fetch tracks
// Rate limit: sleep 1.1s between MB requests
tokio::time::sleep(Duration::from_millis(1100)).await;
let release_mbid = if let Some(ref rid) = rg.first_release_id {
rid.clone()
} else {
// Need to resolve from release group
match resolve_release_from_group(&rg.id).await {
Ok(rid) => rid,
Err(e) => {
tracing::debug!(rg_id = %rg.id, error = %e, "skipping release group");
continue;
}
}
};
tokio::time::sleep(Duration::from_millis(1100)).await;
match state.mb_client.get_release_tracks(&release_mbid).await {
Ok(tracks) => tracks.into_iter().map(|t| t.recording_mbid).collect(),
Err(e) => {
tracing::debug!(
release = %release_mbid,
error = %e,
"failed to fetch tracks"
);
continue;
}
}
};
// Check if any of these tracks are NOT in the wanted items
let new_mbids: Vec<&String> = track_mbids
.iter()
.filter(|mbid| !wanted_mbids.contains(*mbid))
.collect();
if new_mbids.is_empty() {
continue;
}
// Found a release group with new tracks — add the whole album via shanty_watch
tracing::info!(
artist = %artist.name,
album = %rg.title,
new_tracks = new_mbids.len(),
"new release detected for monitored artist"
);
stats.new_releases_found += 1;
match shanty_watch::add_album(
state.db.conn(),
Some(&artist.name),
Some(&rg.title),
rg.first_release_id.as_deref(),
&state.mb_client,
None, // system-initiated, no user_id
)
.await
{
Ok(summary) => {
stats.tracks_added += summary.tracks_added as u32;
}
Err(e) => {
tracing::warn!(
artist = %artist.name,
album = %rg.title,
error = %e,
"failed to add album for monitored artist"
);
}
}
}
// Update last_checked_at
if let Err(e) = queries::artists::update_last_checked(state.db.conn(), artist.id).await {
tracing::warn!(
artist_id = artist.id,
error = %e,
"failed to update last_checked_at"
);
}
}
Ok(stats)
}
/// Given a release-group MBID, find the first release MBID.
async fn resolve_release_from_group(release_group_mbid: &str) -> Result<String, ApiError> {
let client = reqwest::Client::builder()
.user_agent("Shanty/0.1.0 (shanty-music-app)")
.build()
.map_err(|e| ApiError::Internal(e.to_string()))?;
let url = format!(
"https://musicbrainz.org/ws/2/release?release-group={release_group_mbid}&fmt=json&limit=1"
);
let resp: serde_json::Value = client
.get(&url)
.send()
.await
.map_err(|e| ApiError::Internal(e.to_string()))?
.json()
.await
.map_err(|e| ApiError::Internal(e.to_string()))?;
resp.get("releases")
.and_then(|r| r.as_array())
.and_then(|arr| arr.first())
.and_then(|r| r.get("id"))
.and_then(|id| id.as_str())
.map(String::from)
.ok_or_else(|| ApiError::NotFound(format!("no releases for group {release_group_mbid}")))
}
/// Spawn the monitor scheduler background loop.
///
/// Sleeps for the configured interval, then checks monitored artists if enabled.
/// Reads config each iteration so changes take effect without restart.
pub fn spawn(state: web::Data<AppState>) {
tokio::spawn(async move {
loop {
let (enabled, hours) = {
let cfg = state.config.read().await;
(
cfg.scheduling.monitor_enabled,
cfg.scheduling.monitor_interval_hours.max(1),
)
};
let sleep_secs = u64::from(hours) * 3600;
// Update next-run time
{
let mut sched = state.scheduler.lock().await;
sched.next_monitor = if enabled {
Some(
(chrono::Utc::now() + chrono::Duration::seconds(sleep_secs as i64))
.naive_utc(),
)
} else {
None
};
}
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
if !enabled {
continue;
}
// Clear next-run while running
{
let mut sched = state.scheduler.lock().await;
sched.next_monitor = None;
}
tracing::info!("scheduled monitor check starting");
match check_monitored_artists(&state).await {
Ok(stats) => {
tracing::info!(
artists_checked = stats.artists_checked,
new_releases = stats.new_releases_found,
tracks_added = stats.tracks_added,
"scheduled monitor check complete"
);
}
Err(e) => {
tracing::error!(error = %e, "scheduled monitor check failed");
}
}
}
});
}

218
src/pipeline.rs Normal file
View File

@@ -0,0 +1,218 @@
//! Shared pipeline logic used by both the API endpoint and the scheduler.
use actix_web::web;
use shanty_db::queries;
use crate::routes::artists::enrich_all_watched_artists;
use crate::state::AppState;
/// Register and spawn the full 6-step pipeline. Returns the task IDs immediately.
///
/// Steps: sync, download, index, tag, organize, enrich.
pub fn spawn_pipeline(state: &web::Data<AppState>) -> Vec<String> {
let sync_id = state.tasks.register_pending("sync");
let download_id = state.tasks.register_pending("download");
let index_id = state.tasks.register_pending("index");
let tag_id = state.tasks.register_pending("tag");
let organize_id = state.tasks.register_pending("organize");
let enrich_id = state.tasks.register_pending("enrich");
let task_ids = vec![
sync_id.clone(),
download_id.clone(),
index_id.clone(),
tag_id.clone(),
organize_id.clone(),
enrich_id.clone(),
];
let state = state.clone();
tokio::spawn(async move {
run_pipeline_inner(
&state,
&sync_id,
&download_id,
&index_id,
&tag_id,
&organize_id,
&enrich_id,
)
.await;
});
task_ids
}
/// Run the pipeline without registering tasks (for the scheduler, which logs instead).
pub async fn run_pipeline(state: &web::Data<AppState>) -> Vec<String> {
let sync_id = state.tasks.register_pending("sync");
let download_id = state.tasks.register_pending("download");
let index_id = state.tasks.register_pending("index");
let tag_id = state.tasks.register_pending("tag");
let organize_id = state.tasks.register_pending("organize");
let enrich_id = state.tasks.register_pending("enrich");
let task_ids = vec![
sync_id.clone(),
download_id.clone(),
index_id.clone(),
tag_id.clone(),
organize_id.clone(),
enrich_id.clone(),
];
run_pipeline_inner(
state,
&sync_id,
&download_id,
&index_id,
&tag_id,
&organize_id,
&enrich_id,
)
.await;
task_ids
}
async fn run_pipeline_inner(
state: &web::Data<AppState>,
sync_id: &str,
download_id: &str,
index_id: &str,
tag_id: &str,
organize_id: &str,
enrich_id: &str,
) {
let cfg = state.config.read().await.clone();
// Step 1: Sync
state.tasks.start(sync_id);
state
.tasks
.update_progress(sync_id, 0, 0, "Syncing watchlist to download queue...");
match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await {
Ok(stats) => state.tasks.complete(sync_id, format!("{stats}")),
Err(e) => state.tasks.fail(sync_id, e.to_string()),
}
// Step 2: Download
state.tasks.start(download_id);
let cookies = cfg.download.cookies_path.clone();
let format: shanty_dl::AudioFormat = cfg
.download
.format
.parse()
.unwrap_or(shanty_dl::AudioFormat::Opus);
let source: shanty_dl::SearchSource = cfg
.download
.search_source
.parse()
.unwrap_or(shanty_dl::SearchSource::YouTubeMusic);
let rate = if cookies.is_some() {
cfg.download.rate_limit_auth
} else {
cfg.download.rate_limit
};
let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone());
let backend_config = shanty_dl::BackendConfig {
output_dir: cfg.download_path.clone(),
format,
cookies_path: cookies,
};
let task_state = state.clone();
let progress_tid = download_id.to_string();
let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| {
task_state
.tasks
.update_progress(&progress_tid, current, total, msg);
});
match shanty_dl::run_queue_with_progress(
state.db.conn(),
&backend,
&backend_config,
false,
Some(on_progress),
)
.await
{
Ok(stats) => {
let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await;
state.tasks.complete(download_id, format!("{stats}"));
}
Err(e) => state.tasks.fail(download_id, e.to_string()),
}
// Step 3: Index
state.tasks.start(index_id);
state
.tasks
.update_progress(index_id, 0, 0, "Scanning library...");
let scan_config = shanty_index::ScanConfig {
root: cfg.library_path.clone(),
dry_run: false,
concurrency: cfg.indexing.concurrency,
};
match shanty_index::run_scan(state.db.conn(), &scan_config).await {
Ok(stats) => state.tasks.complete(index_id, format!("{stats}")),
Err(e) => state.tasks.fail(index_id, e.to_string()),
}
// Step 4: Tag
state.tasks.start(tag_id);
state
.tasks
.update_progress(tag_id, 0, 0, "Tagging tracks...");
match shanty_tag::MusicBrainzClient::new() {
Ok(mb) => {
let tag_config = shanty_tag::TagConfig {
dry_run: false,
write_tags: cfg.tagging.write_tags,
confidence: cfg.tagging.confidence,
};
match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await {
Ok(stats) => state.tasks.complete(tag_id, format!("{stats}")),
Err(e) => state.tasks.fail(tag_id, e.to_string()),
}
}
Err(e) => state.tasks.fail(tag_id, e.to_string()),
}
// Step 5: Organize
state.tasks.start(organize_id);
state
.tasks
.update_progress(organize_id, 0, 0, "Organizing files...");
let org_config = shanty_org::OrgConfig {
target_dir: cfg.library_path.clone(),
format: cfg.organization_format.clone(),
dry_run: false,
copy: false,
};
match shanty_org::organize_from_db(state.db.conn(), &org_config).await {
Ok(stats) => {
let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn())
.await
.unwrap_or(0);
let msg = if promoted > 0 {
format!("{stats}{promoted} items marked as owned")
} else {
format!("{stats}")
};
state.tasks.complete(organize_id, msg);
}
Err(e) => state.tasks.fail(organize_id, e.to_string()),
}
// Step 6: Enrich
state.tasks.start(enrich_id);
state
.tasks
.update_progress(enrich_id, 0, 0, "Refreshing artist data...");
match enrich_all_watched_artists(state).await {
Ok(count) => state
.tasks
.complete(enrich_id, format!("{count} artists refreshed")),
Err(e) => state.tasks.fail(enrich_id, e.to_string()),
}
}

54
src/pipeline_scheduler.rs Normal file
View File

@@ -0,0 +1,54 @@
//! Background task that runs the full pipeline on a configurable interval.
use std::time::Duration;
use actix_web::web;
use chrono::Utc;
use crate::state::AppState;
/// Spawn the pipeline scheduler background loop.
///
/// Sleeps for the configured interval, then runs the full pipeline if enabled.
/// Reads config each iteration so changes take effect without restart.
pub fn spawn(state: web::Data<AppState>) {
tokio::spawn(async move {
loop {
let (enabled, hours) = {
let cfg = state.config.read().await;
(
cfg.scheduling.pipeline_enabled,
cfg.scheduling.pipeline_interval_hours.max(1),
)
};
let sleep_secs = u64::from(hours) * 3600;
// Update next-run time
{
let mut sched = state.scheduler.lock().await;
sched.next_pipeline = if enabled {
Some((Utc::now() + chrono::Duration::seconds(sleep_secs as i64)).naive_utc())
} else {
None
};
}
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
if !enabled {
continue;
}
// Clear next-run while running
{
let mut sched = state.scheduler.lock().await;
sched.next_pipeline = None;
}
tracing::info!("scheduled pipeline starting");
let task_ids = crate::pipeline::run_pipeline(&state).await;
tracing::info!(?task_ids, "scheduled pipeline complete");
}
});
}

View File

@@ -33,6 +33,7 @@ struct ArtistListItem {
id: i32,
name: String,
musicbrainz_id: Option<String>,
monitored: bool,
total_watched: usize,
total_owned: usize,
total_items: usize,
@@ -72,6 +73,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.route(web::post().to(add_artist)),
)
.service(web::resource("/artists/{id}/full").route(web::get().to(get_artist_full)))
.service(
web::resource("/artists/{id}/monitor")
.route(web::post().to(set_monitored))
.route(web::delete().to(unset_monitored)),
)
.service(
web::resource("/artists/{id}")
.route(web::get().to(get_artist))
@@ -121,6 +127,7 @@ async fn list_artists(
id: a.id,
name: a.name.clone(),
musicbrainz_id: a.musicbrainz_id.clone(),
monitored: a.monitored,
total_watched,
total_owned,
total_items,
@@ -324,6 +331,8 @@ pub async fn enrich_artist(
added_at: chrono::Utc::now().naive_utc(),
top_songs: "[]".to_string(),
similar_artists: "[]".to_string(),
monitored: false,
last_checked_at: None,
};
(synthetic, None, mbid)
}
@@ -603,6 +612,7 @@ pub async fn enrich_artist(
"total_watched_tracks": total_artist_watched,
"total_owned_tracks": total_artist_owned,
"enriched": !skip_track_fetch,
"monitored": artist.monitored,
"artist_info": artist_info,
"artist_photo": artist_photo,
"artist_bio": artist_bio,
@@ -800,3 +810,33 @@ async fn delete_artist(
queries::artists::delete(state.db.conn(), id).await?;
Ok(HttpResponse::NoContent().finish())
}
async fn set_monitored(
state: web::Data<AppState>,
session: Session,
path: web::Path<i32>,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let id = path.into_inner();
let artist = queries::artists::set_monitored(state.db.conn(), id, true).await?;
Ok(HttpResponse::Ok().json(serde_json::json!({
"id": artist.id,
"name": artist.name,
"monitored": artist.monitored,
})))
}
async fn unset_monitored(
state: web::Data<AppState>,
session: Session,
path: web::Path<i32>,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let id = path.into_inner();
let artist = queries::artists::set_monitored(state.db.conn(), id, false).await?;
Ok(HttpResponse::Ok().json(serde_json::json!({
"id": artist.id,
"name": artist.name,
"monitored": artist.monitored,
})))
}

View File

@@ -20,6 +20,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::resource("/tasks/{id}").route(web::get().to(get_task)))
.service(web::resource("/watchlist").route(web::get().to(list_watchlist)))
.service(web::resource("/watchlist/{id}").route(web::delete().to(remove_watchlist)))
.service(web::resource("/monitor/check").route(web::post().to(trigger_monitor_check)))
.service(web::resource("/monitor/status").route(web::get().to(get_monitor_status)))
.service(
web::resource("/config")
.route(web::get().to(get_config))
@@ -48,6 +50,14 @@ async fn get_status(
let needs_tagging = queries::tracks::get_needing_metadata(state.db.conn()).await?;
// Scheduled task info
let sched = state.scheduler.lock().await;
let scheduled_tasks = serde_json::json!({
"next_pipeline": sched.next_pipeline,
"next_monitor": sched.next_monitor,
});
drop(sched);
Ok(HttpResponse::Ok().json(serde_json::json!({
"library": summary,
"queue": {
@@ -61,6 +71,7 @@ async fn get_status(
"items": needs_tagging.iter().take(20).collect::<Vec<_>>(),
},
"tasks": tasks,
"scheduled": scheduled_tasks,
})))
}
@@ -173,158 +184,7 @@ async fn trigger_pipeline(
session: Session,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let sync_id = state.tasks.register_pending("sync");
let download_id = state.tasks.register_pending("download");
let index_id = state.tasks.register_pending("index");
let tag_id = state.tasks.register_pending("tag");
let organize_id = state.tasks.register_pending("organize");
let enrich_id = state.tasks.register_pending("enrich");
let task_ids = vec![
sync_id.clone(),
download_id.clone(),
index_id.clone(),
tag_id.clone(),
organize_id.clone(),
enrich_id.clone(),
];
let state = state.clone();
tokio::spawn(async move {
let cfg = state.config.read().await.clone();
// Step 1: Sync
state.tasks.start(&sync_id);
state
.tasks
.update_progress(&sync_id, 0, 0, "Syncing watchlist to download queue...");
match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await {
Ok(stats) => state.tasks.complete(&sync_id, format!("{stats}")),
Err(e) => state.tasks.fail(&sync_id, e.to_string()),
}
// Step 2: Download
state.tasks.start(&download_id);
let cookies = cfg.download.cookies_path.clone();
let format: shanty_dl::AudioFormat = cfg
.download
.format
.parse()
.unwrap_or(shanty_dl::AudioFormat::Opus);
let source: shanty_dl::SearchSource = cfg
.download
.search_source
.parse()
.unwrap_or(shanty_dl::SearchSource::YouTubeMusic);
let rate = if cookies.is_some() {
cfg.download.rate_limit_auth
} else {
cfg.download.rate_limit
};
let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone());
let backend_config = shanty_dl::BackendConfig {
output_dir: cfg.download_path.clone(),
format,
cookies_path: cookies,
};
let task_state = state.clone();
let progress_tid = download_id.clone();
let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| {
task_state
.tasks
.update_progress(&progress_tid, current, total, msg);
});
match shanty_dl::run_queue_with_progress(
state.db.conn(),
&backend,
&backend_config,
false,
Some(on_progress),
)
.await
{
Ok(stats) => {
let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await;
state.tasks.complete(&download_id, format!("{stats}"));
}
Err(e) => state.tasks.fail(&download_id, e.to_string()),
}
// Step 3: Index
state.tasks.start(&index_id);
state
.tasks
.update_progress(&index_id, 0, 0, "Scanning library...");
let scan_config = shanty_index::ScanConfig {
root: cfg.library_path.clone(),
dry_run: false,
concurrency: cfg.indexing.concurrency,
};
match shanty_index::run_scan(state.db.conn(), &scan_config).await {
Ok(stats) => state.tasks.complete(&index_id, format!("{stats}")),
Err(e) => state.tasks.fail(&index_id, e.to_string()),
}
// Step 4: Tag
state.tasks.start(&tag_id);
state
.tasks
.update_progress(&tag_id, 0, 0, "Tagging tracks...");
match shanty_tag::MusicBrainzClient::new() {
Ok(mb) => {
let tag_config = shanty_tag::TagConfig {
dry_run: false,
write_tags: cfg.tagging.write_tags,
confidence: cfg.tagging.confidence,
};
match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await {
Ok(stats) => state.tasks.complete(&tag_id, format!("{stats}")),
Err(e) => state.tasks.fail(&tag_id, e.to_string()),
}
}
Err(e) => state.tasks.fail(&tag_id, e.to_string()),
}
// Step 5: Organize
state.tasks.start(&organize_id);
state
.tasks
.update_progress(&organize_id, 0, 0, "Organizing files...");
let org_config = shanty_org::OrgConfig {
target_dir: cfg.library_path.clone(),
format: cfg.organization_format.clone(),
dry_run: false,
copy: false,
};
match shanty_org::organize_from_db(state.db.conn(), &org_config).await {
Ok(stats) => {
let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn())
.await
.unwrap_or(0);
let msg = if promoted > 0 {
format!("{stats}{promoted} items marked as owned")
} else {
format!("{stats}")
};
state.tasks.complete(&organize_id, msg);
}
Err(e) => state.tasks.fail(&organize_id, e.to_string()),
}
// Step 6: Enrich
state.tasks.start(&enrich_id);
state
.tasks
.update_progress(&enrich_id, 0, 0, "Refreshing artist data...");
match enrich_all_watched_artists(&state).await {
Ok(count) => state
.tasks
.complete(&enrich_id, format!("{count} artists refreshed")),
Err(e) => state.tasks.fail(&enrich_id, e.to_string()),
}
});
let task_ids = crate::pipeline::spawn_pipeline(&state);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids })))
}
@@ -361,6 +221,53 @@ async fn remove_watchlist(
Ok(HttpResponse::NoContent().finish())
}
async fn trigger_monitor_check(
state: web::Data<AppState>,
session: Session,
) -> Result<HttpResponse, ApiError> {
auth::require_admin(&session)?;
let state = state.clone();
let task_id = state.tasks.register("monitor_check");
let tid = task_id.clone();
tokio::spawn(async move {
state
.tasks
.update_progress(&tid, 0, 0, "Checking monitored artists...");
match crate::monitor::check_monitored_artists(&state).await {
Ok(stats) => state.tasks.complete(
&tid,
format!(
"{} artists checked, {} new releases, {} tracks added",
stats.artists_checked, stats.new_releases_found, stats.tracks_added
),
),
Err(e) => state.tasks.fail(&tid, e.to_string()),
}
});
Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
}
async fn get_monitor_status(
state: web::Data<AppState>,
session: Session,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let monitored = queries::artists::list_monitored(state.db.conn()).await?;
let items: Vec<serde_json::Value> = monitored
.iter()
.map(|a| {
serde_json::json!({
"id": a.id,
"name": a.name,
"musicbrainz_id": a.musicbrainz_id,
"monitored": a.monitored,
"last_checked_at": a.last_checked_at,
})
})
.collect();
Ok(HttpResponse::Ok().json(items))
}
async fn get_config(
state: web::Data<AppState>,
session: Session,

View File

@@ -14,6 +14,14 @@ pub struct FirefoxLoginSession {
pub vnc_url: String,
}
/// Tracks next-run times for scheduled background tasks.
pub struct SchedulerInfo {
/// When the next pipeline run is scheduled (None if disabled).
pub next_pipeline: Option<chrono::NaiveDateTime>,
/// When the next monitor check is scheduled (None if disabled).
pub next_monitor: Option<chrono::NaiveDateTime>,
}
pub struct AppState {
pub db: Database,
pub mb_client: MusicBrainzFetcher,
@@ -23,4 +31,5 @@ pub struct AppState {
pub config_path: Option<String>,
pub tasks: TaskManager,
pub firefox_login: Mutex<Option<FirefoxLoginSession>>,
pub scheduler: Mutex<SchedulerInfo>,
}