Files
web/src/monitor.rs
2026-03-20 18:09:47 -04:00

265 lines
9.1 KiB
Rust

//! Artist monitoring — periodically checks for new releases from monitored artists
//! and automatically adds them to the watchlist.
use std::collections::HashSet;
use std::time::Duration;
use actix_web::web;
use serde::Serialize;
use shanty_data::MetadataFetcher;
use shanty_db::queries;
use shanty_search::SearchProvider;
use crate::error::ApiError;
use crate::state::AppState;
/// Stats returned from a monitor check run.
#[derive(Debug, Clone, Serialize)]
pub struct MonitorStats {
pub artists_checked: u32,
pub new_releases_found: u32,
pub tracks_added: u32,
}
/// Check all monitored artists for new releases and add them to the watchlist.
pub async fn check_monitored_artists(
state: &web::Data<AppState>,
) -> Result<MonitorStats, ApiError> {
let monitored = queries::artists::list_monitored(state.db.conn()).await?;
let mut stats = MonitorStats {
artists_checked: 0,
new_releases_found: 0,
tracks_added: 0,
};
let allowed = state.config.read().await.allowed_secondary_types.clone();
for artist in &monitored {
let mbid = match &artist.musicbrainz_id {
Some(m) => m.clone(),
None => {
tracing::warn!(
artist_id = artist.id,
name = %artist.name,
"monitored artist has no MBID, skipping"
);
continue;
}
};
stats.artists_checked += 1;
// Fetch release groups from MusicBrainz
let all_release_groups = match state.search.get_release_groups(&mbid).await {
Ok(rgs) => rgs,
Err(e) => {
tracing::warn!(
artist = %artist.name,
error = %e,
"failed to fetch release groups for monitored artist"
);
continue;
}
};
// Filter by allowed secondary types (same logic as enrich_artist)
let release_groups: Vec<_> = all_release_groups
.into_iter()
.filter(|rg| {
if rg.secondary_types.is_empty() {
true
} else {
rg.secondary_types.iter().all(|st| allowed.contains(st))
}
})
.collect();
// Get all existing wanted items for this artist
let artist_wanted = queries::wanted::list(state.db.conn(), None, None).await?;
let wanted_mbids: HashSet<String> = artist_wanted
.iter()
.filter(|w| w.artist_id == Some(artist.id))
.filter_map(|w| w.musicbrainz_id.clone())
.collect();
// Check each release group's tracks to find new ones
for rg in &release_groups {
// Check if we already have any tracks from this release group cached
let cache_key = format!("artist_rg_tracks:{}", rg.id);
let cached_tracks: Option<Vec<String>> =
if let Ok(Some(json)) = queries::cache::get(state.db.conn(), &cache_key).await {
// Parse the cached data to get recording MBIDs
if let Ok(cached) = serde_json::from_str::<serde_json::Value>(&json) {
cached
.get("tracks")
.and_then(|t| t.as_array())
.map(|tracks| {
tracks
.iter()
.filter_map(|t| {
t.get("recording_mbid")
.and_then(|m| m.as_str())
.map(String::from)
})
.collect()
})
} else {
None
}
} else {
None
};
let track_mbids = if let Some(mbids) = cached_tracks {
mbids
} else {
// Not cached — resolve release and fetch tracks (rate limited by shared MB client)
let release_mbid = if let Some(ref rid) = rg.first_release_id {
rid.clone()
} else {
// Resolve from release group (goes through shared rate limiter)
match state.mb_client.resolve_release_from_group(&rg.id).await {
Ok(rid) => rid,
Err(e) => {
tracing::debug!(rg_id = %rg.id, error = %e, "skipping release group");
continue;
}
}
};
match state.mb_client.get_release_tracks(&release_mbid).await {
Ok(tracks) => tracks.into_iter().map(|t| t.recording_mbid).collect(),
Err(e) => {
tracing::debug!(
release = %release_mbid,
error = %e,
"failed to fetch tracks"
);
continue;
}
}
};
// Check if any of these tracks are NOT in the wanted items
let new_mbids: Vec<&String> = track_mbids
.iter()
.filter(|mbid| !wanted_mbids.contains(*mbid))
.collect();
if new_mbids.is_empty() {
continue;
}
// Found a release group with new tracks — add the whole album via shanty_watch
tracing::info!(
artist = %artist.name,
album = %rg.title,
new_tracks = new_mbids.len(),
"new release detected for monitored artist"
);
stats.new_releases_found += 1;
match shanty_watch::add_album(
state.db.conn(),
Some(&artist.name),
Some(&rg.title),
rg.first_release_id.as_deref(),
&state.mb_client,
None, // system-initiated, no user_id
)
.await
{
Ok(summary) => {
stats.tracks_added += summary.tracks_added as u32;
}
Err(e) => {
tracing::warn!(
artist = %artist.name,
album = %rg.title,
error = %e,
"failed to add album for monitored artist"
);
}
}
}
// Update last_checked_at
if let Err(e) = queries::artists::update_last_checked(state.db.conn(), artist.id).await {
tracing::warn!(
artist_id = artist.id,
error = %e,
"failed to update last_checked_at"
);
}
}
Ok(stats)
}
/// Spawn the monitor scheduler background loop.
///
/// Sleeps for the configured interval, then checks monitored artists if enabled.
/// Reads config each iteration so changes take effect without restart.
pub fn spawn(state: web::Data<AppState>) {
tokio::spawn(async move {
loop {
let (enabled, hours) = {
let cfg = state.config.read().await;
(
cfg.scheduling.monitor_enabled,
cfg.scheduling.monitor_interval_hours.max(1),
)
};
let sleep_secs = u64::from(hours) * 3600;
// Update next-run time
{
let mut sched = state.scheduler.lock().await;
sched.next_monitor = if enabled {
Some(
(chrono::Utc::now() + chrono::Duration::seconds(sleep_secs as i64))
.naive_utc(),
)
} else {
None
};
}
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
if !enabled {
continue;
}
// Check if this run was skipped
{
let mut sched = state.scheduler.lock().await;
sched.next_monitor = None;
if sched.skip_monitor {
sched.skip_monitor = false;
tracing::info!("scheduled monitor check skipped (user cancelled)");
continue;
}
}
tracing::info!("scheduled monitor check starting");
match check_monitored_artists(&state).await {
Ok(stats) => {
tracing::info!(
artists_checked = stats.artists_checked,
new_releases = stats.new_releases_found,
tracks_added = stats.tracks_added,
"scheduled monitor check complete"
);
}
Err(e) => {
tracing::error!(error = %e, "scheduled monitor check failed");
}
}
}
});
}