162 lines
5.7 KiB
Rust
162 lines
5.7 KiB
Rust
//! Unified scheduler that manages all recurring background jobs.
|
|
//!
|
|
//! Replaces the separate pipeline_scheduler, monitor, and cookie_refresh spawn loops
|
|
//! with a single loop backed by persistent state in the `scheduler_state` DB table.
|
|
|
|
use std::time::Duration;
|
|
|
|
use actix_web::web;
|
|
use chrono::Utc;
|
|
|
|
use shanty_db::queries;
|
|
|
|
use crate::state::AppState;
|
|
|
|
/// Spawn the unified scheduler background loop.
|
|
pub fn spawn(state: web::Data<AppState>) {
|
|
tokio::spawn(async move {
|
|
// Initialize scheduler state rows in DB with next_run_at pre-populated
|
|
for job_name in ["pipeline", "monitor", "cookie_refresh"] {
|
|
match queries::scheduler_state::get_or_create(state.db.conn(), job_name).await {
|
|
Ok(job) => {
|
|
if job.next_run_at.is_none() {
|
|
let (enabled, interval_secs) = read_job_config(&state, job_name).await;
|
|
if enabled {
|
|
let next =
|
|
Utc::now().naive_utc() + chrono::Duration::seconds(interval_secs);
|
|
let _ = queries::scheduler_state::update_next_run(
|
|
state.db.conn(),
|
|
job_name,
|
|
Some(next),
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(job = job_name, error = %e, "failed to init scheduler state");
|
|
}
|
|
}
|
|
}
|
|
|
|
loop {
|
|
// Check each job
|
|
check_and_run_job(&state, "pipeline", run_pipeline_job).await;
|
|
check_and_run_job(&state, "monitor", run_monitor_job).await;
|
|
check_and_run_job(&state, "cookie_refresh", run_cookie_refresh_job).await;
|
|
|
|
// Poll every 30 seconds
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
}
|
|
});
|
|
}
|
|
|
|
async fn check_and_run_job<F, Fut>(state: &web::Data<AppState>, job_name: &str, run_fn: F)
|
|
where
|
|
F: FnOnce(web::Data<AppState>) -> Fut,
|
|
Fut: std::future::Future<Output = Result<String, String>>,
|
|
{
|
|
let job = match queries::scheduler_state::get_or_create(state.db.conn(), job_name).await {
|
|
Ok(j) => j,
|
|
Err(e) => {
|
|
tracing::error!(job = job_name, error = %e, "failed to read scheduler state");
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Read config to check if enabled and get interval
|
|
let (config_enabled, interval_secs) = read_job_config(state, job_name).await;
|
|
|
|
// If config says disabled, ensure DB state reflects it
|
|
if !config_enabled {
|
|
if job.enabled {
|
|
let _ = queries::scheduler_state::set_enabled(state.db.conn(), job_name, false).await;
|
|
let _ =
|
|
queries::scheduler_state::update_next_run(state.db.conn(), job_name, None).await;
|
|
}
|
|
return;
|
|
}
|
|
|
|
// If DB says disabled (e.g. user skipped), re-enable from config
|
|
if !job.enabled {
|
|
let _ = queries::scheduler_state::set_enabled(state.db.conn(), job_name, true).await;
|
|
}
|
|
|
|
let now = Utc::now().naive_utc();
|
|
|
|
// If no next_run_at is set, schedule one
|
|
if job.next_run_at.is_none() {
|
|
let next = now + chrono::Duration::seconds(interval_secs);
|
|
let _ =
|
|
queries::scheduler_state::update_next_run(state.db.conn(), job_name, Some(next)).await;
|
|
return;
|
|
}
|
|
|
|
// Check if it's time to run
|
|
let next_run = job.next_run_at.unwrap();
|
|
if now < next_run {
|
|
return;
|
|
}
|
|
|
|
// Time to run!
|
|
tracing::info!(job = job_name, "scheduled job starting");
|
|
|
|
let result = run_fn(state.clone()).await;
|
|
|
|
let result_str = match &result {
|
|
Ok(msg) => {
|
|
tracing::info!(job = job_name, result = %msg, "scheduled job complete");
|
|
format!("ok: {msg}")
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(job = job_name, error = %e, "scheduled job failed");
|
|
format!("error: {e}")
|
|
}
|
|
};
|
|
|
|
// Update last run and schedule next
|
|
let _ = queries::scheduler_state::update_last_run(state.db.conn(), job_name, &result_str).await;
|
|
let next = Utc::now().naive_utc() + chrono::Duration::seconds(interval_secs);
|
|
let _ = queries::scheduler_state::update_next_run(state.db.conn(), job_name, Some(next)).await;
|
|
}
|
|
|
|
async fn read_job_config(state: &web::Data<AppState>, job_name: &str) -> (bool, i64) {
|
|
let cfg = state.config.read().await;
|
|
match job_name {
|
|
"pipeline" => (
|
|
cfg.scheduling.pipeline_enabled,
|
|
i64::from(cfg.scheduling.pipeline_interval_hours.max(1)) * 3600,
|
|
),
|
|
"monitor" => (
|
|
cfg.scheduling.monitor_enabled,
|
|
i64::from(cfg.scheduling.monitor_interval_hours.max(1)) * 3600,
|
|
),
|
|
"cookie_refresh" => (
|
|
cfg.download.cookie_refresh_enabled,
|
|
i64::from(cfg.download.cookie_refresh_hours.max(1)) * 3600,
|
|
),
|
|
_ => (false, 3600),
|
|
}
|
|
}
|
|
|
|
async fn run_pipeline_job(state: web::Data<AppState>) -> Result<String, String> {
|
|
let pipeline_id = crate::pipeline::trigger_pipeline(&state)
|
|
.await
|
|
.map_err(|e| e.to_string())?;
|
|
Ok(format!("pipeline triggered: {pipeline_id}"))
|
|
}
|
|
|
|
async fn run_monitor_job(state: web::Data<AppState>) -> Result<String, String> {
|
|
let stats = crate::monitor::check_monitored_artists(&state)
|
|
.await
|
|
.map_err(|e| e.to_string())?;
|
|
Ok(format!(
|
|
"{} artists checked, {} new releases, {} tracks added",
|
|
stats.artists_checked, stats.new_releases_found, stats.tracks_added
|
|
))
|
|
}
|
|
|
|
async fn run_cookie_refresh_job(_state: web::Data<AppState>) -> Result<String, String> {
|
|
crate::cookie_refresh::run_refresh().await
|
|
}
|