redux of the worker queue
This commit is contained in:
@@ -365,6 +365,35 @@ pub fn dashboard() -> Html {
|
||||
</div>
|
||||
</div>
|
||||
|
||||
// Work Queue Progress
|
||||
if let Some(ref wq) = s.work_queue {
|
||||
if wq.download.pending + wq.download.running + wq.tag.pending + wq.tag.running + wq.organize.pending + wq.organize.running > 0
|
||||
|| wq.download.completed + wq.tag.completed + wq.organize.completed > 0
|
||||
{
|
||||
<div class="card">
|
||||
<h3>{ "Pipeline Progress" }</h3>
|
||||
<table>
|
||||
<thead>
|
||||
<tr><th>{ "Step" }</th><th>{ "Pending" }</th><th>{ "Running" }</th><th>{ "Done" }</th><th>{ "Failed" }</th></tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{ for [("Download", &wq.download), ("Index", &wq.index), ("Tag", &wq.tag), ("Organize", &wq.organize)].iter().map(|(name, c)| {
|
||||
html! {
|
||||
<tr>
|
||||
<td>{ name }</td>
|
||||
<td>{ c.pending }</td>
|
||||
<td>{ if c.running > 0 { html! { <span class="badge badge-accent">{ c.running }</span> } } else { html! { { "0" } } } }</td>
|
||||
<td>{ c.completed }</td>
|
||||
<td>{ if c.failed > 0 { html! { <span class="badge badge-danger">{ c.failed }</span> } } else { html! { { "0" } } } }</td>
|
||||
</tr>
|
||||
}
|
||||
})}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
}
|
||||
}
|
||||
|
||||
// Background Tasks (always show if there are tasks or scheduled items)
|
||||
if !s.tasks.is_empty() || has_scheduled {
|
||||
<div class="card">
|
||||
|
||||
@@ -225,7 +225,10 @@ pub struct TaskRef {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
||||
pub struct PipelineRef {
|
||||
#[serde(default)]
|
||||
pub task_ids: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub pipeline_id: Option<String>,
|
||||
}
|
||||
|
||||
// --- Status ---
|
||||
@@ -246,6 +249,10 @@ pub struct Status {
|
||||
pub tasks: Vec<TaskInfo>,
|
||||
#[serde(default)]
|
||||
pub scheduled: Option<ScheduledTasks>,
|
||||
#[serde(default)]
|
||||
pub work_queue: Option<WorkQueueStats>,
|
||||
#[serde(default)]
|
||||
pub scheduler: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
||||
@@ -254,6 +261,22 @@ pub struct ScheduledTasks {
|
||||
pub next_monitor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
||||
pub struct WorkQueueCounts {
|
||||
pub pending: u64,
|
||||
pub running: u64,
|
||||
pub completed: u64,
|
||||
pub failed: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
||||
pub struct WorkQueueStats {
|
||||
pub download: WorkQueueCounts,
|
||||
pub index: WorkQueueCounts,
|
||||
pub tag: WorkQueueCounts,
|
||||
pub organize: WorkQueueCounts,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize)]
|
||||
pub struct LibrarySummary {
|
||||
pub total_items: u64,
|
||||
|
||||
@@ -1,59 +1,22 @@
|
||||
//! Background task that periodically refreshes YouTube cookies via headless Firefox.
|
||||
//! YouTube cookie refresh via headless Firefox.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::config::AppConfig;
|
||||
/// Run a headless cookie refresh. Returns success message or error.
|
||||
pub async fn run_refresh() -> Result<String, String> {
|
||||
let profile_dir = shanty_config::data_dir().join("firefox-profile");
|
||||
let cookies_path = shanty_config::data_dir().join("cookies.txt");
|
||||
|
||||
/// Spawn the cookie refresh background loop.
|
||||
///
|
||||
/// This task runs forever, sleeping for `cookie_refresh_hours` between refreshes.
|
||||
/// It reads the current config on each iteration so changes take effect without restart.
|
||||
pub fn spawn(config: Arc<RwLock<AppConfig>>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (enabled, hours) = {
|
||||
let cfg = config.read().await;
|
||||
(
|
||||
cfg.download.cookie_refresh_enabled,
|
||||
cfg.download.cookie_refresh_hours.max(1),
|
||||
)
|
||||
};
|
||||
if !profile_dir.exists() {
|
||||
return Err(format!(
|
||||
"no Firefox profile at {}",
|
||||
profile_dir.display()
|
||||
));
|
||||
}
|
||||
|
||||
// Sleep for the configured interval
|
||||
tokio::time::sleep(Duration::from_secs(u64::from(hours) * 3600)).await;
|
||||
|
||||
if !enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
let profile_dir = shanty_config::data_dir().join("firefox-profile");
|
||||
let cookies_path = shanty_config::data_dir().join("cookies.txt");
|
||||
|
||||
if !profile_dir.exists() {
|
||||
tracing::warn!(
|
||||
"cookie refresh skipped: no Firefox profile at {}",
|
||||
profile_dir.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
tracing::info!("starting cookie refresh");
|
||||
|
||||
match run_refresh(&profile_dir, &cookies_path).await {
|
||||
Ok(msg) => tracing::info!("cookie refresh complete: {msg}"),
|
||||
Err(e) => tracing::error!("cookie refresh failed: {e}"),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_refresh(profile_dir: &Path, cookies_path: &Path) -> Result<String, String> {
|
||||
let script = find_script()?;
|
||||
|
||||
let output = Command::new("python3")
|
||||
|
||||
@@ -11,7 +11,8 @@ pub mod error;
|
||||
pub mod mb_update;
|
||||
pub mod monitor;
|
||||
pub mod pipeline;
|
||||
pub mod pipeline_scheduler;
|
||||
pub mod routes;
|
||||
pub mod scheduler;
|
||||
pub mod state;
|
||||
pub mod tasks;
|
||||
pub mod workers;
|
||||
|
||||
17
src/main.rs
17
src/main.rs
@@ -14,6 +14,7 @@ use shanty_web::config::AppConfig;
|
||||
use shanty_web::routes;
|
||||
use shanty_web::state::AppState;
|
||||
use shanty_web::tasks::TaskManager;
|
||||
use shanty_web::workers::WorkerManager;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "shanty-web", about = "Shanty web interface backend")]
|
||||
@@ -94,21 +95,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
config: std::sync::Arc::new(tokio::sync::RwLock::new(config)),
|
||||
config_path,
|
||||
tasks: TaskManager::new(),
|
||||
workers: WorkerManager::new(),
|
||||
firefox_login: tokio::sync::Mutex::new(None),
|
||||
scheduler: tokio::sync::Mutex::new(shanty_web::state::SchedulerInfo {
|
||||
next_pipeline: None,
|
||||
next_monitor: None,
|
||||
skip_pipeline: false,
|
||||
skip_monitor: false,
|
||||
}),
|
||||
});
|
||||
|
||||
// Start background cookie refresh task
|
||||
shanty_web::cookie_refresh::spawn(state.config.clone());
|
||||
|
||||
// Start pipeline and monitor schedulers
|
||||
shanty_web::pipeline_scheduler::spawn(state.clone());
|
||||
shanty_web::monitor::spawn(state.clone());
|
||||
// Start work queue workers and unified scheduler
|
||||
WorkerManager::spawn_all(state.clone());
|
||||
shanty_web::scheduler::spawn(state.clone());
|
||||
shanty_web::mb_update::spawn(state.clone());
|
||||
|
||||
// Resolve static files directory relative to the binary location
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//! and automatically adds them to the watchlist.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::web;
|
||||
use serde::Serialize;
|
||||
@@ -197,68 +196,3 @@ pub async fn check_monitored_artists(
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Spawn the monitor scheduler background loop.
|
||||
///
|
||||
/// Sleeps for the configured interval, then checks monitored artists if enabled.
|
||||
/// Reads config each iteration so changes take effect without restart.
|
||||
pub fn spawn(state: web::Data<AppState>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (enabled, hours) = {
|
||||
let cfg = state.config.read().await;
|
||||
(
|
||||
cfg.scheduling.monitor_enabled,
|
||||
cfg.scheduling.monitor_interval_hours.max(1),
|
||||
)
|
||||
};
|
||||
|
||||
let sleep_secs = u64::from(hours) * 3600;
|
||||
|
||||
// Update next-run time
|
||||
{
|
||||
let mut sched = state.scheduler.lock().await;
|
||||
sched.next_monitor = if enabled {
|
||||
Some(
|
||||
(chrono::Utc::now() + chrono::Duration::seconds(sleep_secs as i64))
|
||||
.naive_utc(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
|
||||
|
||||
if !enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this run was skipped
|
||||
{
|
||||
let mut sched = state.scheduler.lock().await;
|
||||
sched.next_monitor = None;
|
||||
if sched.skip_monitor {
|
||||
sched.skip_monitor = false;
|
||||
tracing::info!("scheduled monitor check skipped (user cancelled)");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("scheduled monitor check starting");
|
||||
match check_monitored_artists(&state).await {
|
||||
Ok(stats) => {
|
||||
tracing::info!(
|
||||
artists_checked = stats.artists_checked,
|
||||
new_releases = stats.new_releases_found,
|
||||
tracks_added = stats.tracks_added,
|
||||
"scheduled monitor check complete"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "scheduled monitor check failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
261
src/pipeline.rs
261
src/pipeline.rs
@@ -1,218 +1,67 @@
|
||||
//! Shared pipeline logic used by both the API endpoint and the scheduler.
|
||||
//! Pipeline trigger logic. Creates work queue items that flow through the
|
||||
//! Download → Tag → Organize pipeline concurrently via typed workers.
|
||||
|
||||
use actix_web::web;
|
||||
|
||||
use shanty_db::entities::download_queue::DownloadStatus;
|
||||
use shanty_db::entities::work_queue::WorkTaskType;
|
||||
use shanty_db::queries;
|
||||
|
||||
use crate::routes::artists::enrich_all_watched_artists;
|
||||
use crate::error::ApiError;
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Register and spawn the full 6-step pipeline. Returns the task IDs immediately.
|
||||
///
|
||||
/// Steps: sync, download, index, tag, organize, enrich.
|
||||
pub fn spawn_pipeline(state: &web::Data<AppState>) -> Vec<String> {
|
||||
let sync_id = state.tasks.register_pending("sync");
|
||||
let download_id = state.tasks.register_pending("download");
|
||||
let index_id = state.tasks.register_pending("index");
|
||||
let tag_id = state.tasks.register_pending("tag");
|
||||
let organize_id = state.tasks.register_pending("organize");
|
||||
let enrich_id = state.tasks.register_pending("enrich");
|
||||
/// Trigger the full pipeline: sync wanted items to download queue, then create
|
||||
/// work queue items for each pending download. Returns a pipeline ID for tracking.
|
||||
pub async fn trigger_pipeline(state: &web::Data<AppState>) -> Result<String, ApiError> {
|
||||
let pipeline_id = uuid::Uuid::new_v4().to_string();
|
||||
let conn = state.db.conn();
|
||||
|
||||
let task_ids = vec![
|
||||
sync_id.clone(),
|
||||
download_id.clone(),
|
||||
index_id.clone(),
|
||||
tag_id.clone(),
|
||||
organize_id.clone(),
|
||||
enrich_id.clone(),
|
||||
];
|
||||
// Step 1: Sync wanted items to download queue (fast, just DB inserts)
|
||||
let sync_stats = shanty_dl::sync_wanted_to_queue(conn, false).await?;
|
||||
tracing::info!(
|
||||
enqueued = sync_stats.enqueued,
|
||||
skipped = sync_stats.skipped,
|
||||
pipeline_id = %pipeline_id,
|
||||
"pipeline sync complete"
|
||||
);
|
||||
|
||||
let state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
run_pipeline_inner(
|
||||
&state,
|
||||
&sync_id,
|
||||
&download_id,
|
||||
&index_id,
|
||||
&tag_id,
|
||||
&organize_id,
|
||||
&enrich_id,
|
||||
// Step 2: Create Download work items for each pending download_queue entry
|
||||
let pending = queries::downloads::list(conn, Some(DownloadStatus::Pending)).await?;
|
||||
for dl_item in &pending {
|
||||
let payload = serde_json::json!({"download_queue_id": dl_item.id});
|
||||
queries::work_queue::enqueue(
|
||||
conn,
|
||||
WorkTaskType::Download,
|
||||
&payload.to_string(),
|
||||
Some(&pipeline_id),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !pending.is_empty() {
|
||||
state.workers.notify(WorkTaskType::Download);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
download_items = pending.len(),
|
||||
pipeline_id = %pipeline_id,
|
||||
"pipeline work items created"
|
||||
);
|
||||
|
||||
Ok(pipeline_id)
|
||||
}
|
||||
|
||||
/// Trigger the pipeline and return immediately (for API endpoints).
|
||||
pub fn spawn_pipeline(state: &web::Data<AppState>) -> String {
|
||||
let state = state.clone();
|
||||
let pipeline_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match trigger_pipeline(&state).await {
|
||||
Ok(id) => tracing::info!(pipeline_id = %id, "pipeline triggered"),
|
||||
Err(e) => tracing::error!(error = %e, "pipeline trigger failed"),
|
||||
}
|
||||
});
|
||||
|
||||
task_ids
|
||||
}
|
||||
|
||||
/// Run the pipeline without registering tasks (for the scheduler, which logs instead).
|
||||
pub async fn run_pipeline(state: &web::Data<AppState>) -> Vec<String> {
|
||||
let sync_id = state.tasks.register_pending("sync");
|
||||
let download_id = state.tasks.register_pending("download");
|
||||
let index_id = state.tasks.register_pending("index");
|
||||
let tag_id = state.tasks.register_pending("tag");
|
||||
let organize_id = state.tasks.register_pending("organize");
|
||||
let enrich_id = state.tasks.register_pending("enrich");
|
||||
|
||||
let task_ids = vec![
|
||||
sync_id.clone(),
|
||||
download_id.clone(),
|
||||
index_id.clone(),
|
||||
tag_id.clone(),
|
||||
organize_id.clone(),
|
||||
enrich_id.clone(),
|
||||
];
|
||||
|
||||
run_pipeline_inner(
|
||||
state,
|
||||
&sync_id,
|
||||
&download_id,
|
||||
&index_id,
|
||||
&tag_id,
|
||||
&organize_id,
|
||||
&enrich_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
task_ids
|
||||
}
|
||||
|
||||
async fn run_pipeline_inner(
|
||||
state: &web::Data<AppState>,
|
||||
sync_id: &str,
|
||||
download_id: &str,
|
||||
index_id: &str,
|
||||
tag_id: &str,
|
||||
organize_id: &str,
|
||||
enrich_id: &str,
|
||||
) {
|
||||
let cfg = state.config.read().await.clone();
|
||||
|
||||
// Step 1: Sync
|
||||
state.tasks.start(sync_id);
|
||||
state
|
||||
.tasks
|
||||
.update_progress(sync_id, 0, 0, "Syncing watchlist to download queue...");
|
||||
match shanty_dl::sync_wanted_to_queue(state.db.conn(), false).await {
|
||||
Ok(stats) => state.tasks.complete(sync_id, format!("{stats}")),
|
||||
Err(e) => state.tasks.fail(sync_id, e.to_string()),
|
||||
}
|
||||
|
||||
// Step 2: Download
|
||||
state.tasks.start(download_id);
|
||||
let cookies = cfg.download.cookies_path.clone();
|
||||
let format: shanty_dl::AudioFormat = cfg
|
||||
.download
|
||||
.format
|
||||
.parse()
|
||||
.unwrap_or(shanty_dl::AudioFormat::Opus);
|
||||
let source: shanty_dl::SearchSource = cfg
|
||||
.download
|
||||
.search_source
|
||||
.parse()
|
||||
.unwrap_or(shanty_dl::SearchSource::YouTubeMusic);
|
||||
let rate = if cookies.is_some() {
|
||||
cfg.download.rate_limit_auth
|
||||
} else {
|
||||
cfg.download.rate_limit
|
||||
};
|
||||
let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone());
|
||||
let backend_config = shanty_dl::BackendConfig {
|
||||
output_dir: cfg.download_path.clone(),
|
||||
format,
|
||||
cookies_path: cookies,
|
||||
};
|
||||
let task_state = state.clone();
|
||||
let progress_tid = download_id.to_string();
|
||||
let on_progress: shanty_dl::ProgressFn = Box::new(move |current, total, msg| {
|
||||
task_state
|
||||
.tasks
|
||||
.update_progress(&progress_tid, current, total, msg);
|
||||
});
|
||||
match shanty_dl::run_queue_with_progress(
|
||||
state.db.conn(),
|
||||
&backend,
|
||||
&backend_config,
|
||||
false,
|
||||
Some(on_progress),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(stats) => {
|
||||
let _ = queries::cache::purge_prefix(state.db.conn(), "artist_totals:").await;
|
||||
state.tasks.complete(download_id, format!("{stats}"));
|
||||
}
|
||||
Err(e) => state.tasks.fail(download_id, e.to_string()),
|
||||
}
|
||||
|
||||
// Step 3: Index
|
||||
state.tasks.start(index_id);
|
||||
state
|
||||
.tasks
|
||||
.update_progress(index_id, 0, 0, "Scanning library...");
|
||||
let scan_config = shanty_index::ScanConfig {
|
||||
root: cfg.library_path.clone(),
|
||||
dry_run: false,
|
||||
concurrency: cfg.indexing.concurrency,
|
||||
};
|
||||
match shanty_index::run_scan(state.db.conn(), &scan_config).await {
|
||||
Ok(stats) => state.tasks.complete(index_id, format!("{stats}")),
|
||||
Err(e) => state.tasks.fail(index_id, e.to_string()),
|
||||
}
|
||||
|
||||
// Step 4: Tag
|
||||
state.tasks.start(tag_id);
|
||||
state
|
||||
.tasks
|
||||
.update_progress(tag_id, 0, 0, "Tagging tracks...");
|
||||
match shanty_tag::MusicBrainzClient::new() {
|
||||
Ok(mb) => {
|
||||
let tag_config = shanty_tag::TagConfig {
|
||||
dry_run: false,
|
||||
write_tags: cfg.tagging.write_tags,
|
||||
confidence: cfg.tagging.confidence,
|
||||
};
|
||||
match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await {
|
||||
Ok(stats) => state.tasks.complete(tag_id, format!("{stats}")),
|
||||
Err(e) => state.tasks.fail(tag_id, e.to_string()),
|
||||
}
|
||||
}
|
||||
Err(e) => state.tasks.fail(tag_id, e.to_string()),
|
||||
}
|
||||
|
||||
// Step 5: Organize
|
||||
state.tasks.start(organize_id);
|
||||
state
|
||||
.tasks
|
||||
.update_progress(organize_id, 0, 0, "Organizing files...");
|
||||
let org_config = shanty_org::OrgConfig {
|
||||
target_dir: cfg.library_path.clone(),
|
||||
format: cfg.organization_format.clone(),
|
||||
dry_run: false,
|
||||
copy: false,
|
||||
};
|
||||
match shanty_org::organize_from_db(state.db.conn(), &org_config).await {
|
||||
Ok(stats) => {
|
||||
let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn())
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
let msg = if promoted > 0 {
|
||||
format!("{stats} — {promoted} items marked as owned")
|
||||
} else {
|
||||
format!("{stats}")
|
||||
};
|
||||
state.tasks.complete(organize_id, msg);
|
||||
}
|
||||
Err(e) => state.tasks.fail(organize_id, e.to_string()),
|
||||
}
|
||||
|
||||
// Step 6: Enrich
|
||||
state.tasks.start(enrich_id);
|
||||
state
|
||||
.tasks
|
||||
.update_progress(enrich_id, 0, 0, "Refreshing artist data...");
|
||||
match enrich_all_watched_artists(state).await {
|
||||
Ok(count) => state
|
||||
.tasks
|
||||
.complete(enrich_id, format!("{count} artists refreshed")),
|
||||
Err(e) => state.tasks.fail(enrich_id, e.to_string()),
|
||||
}
|
||||
pipeline_id
|
||||
}
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
//! Background task that runs the full pipeline on a configurable interval.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::web;
|
||||
use chrono::Utc;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Spawn the pipeline scheduler background loop.
|
||||
///
|
||||
/// Sleeps for the configured interval, then runs the full pipeline if enabled.
|
||||
/// Reads config each iteration so changes take effect without restart.
|
||||
pub fn spawn(state: web::Data<AppState>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (enabled, hours) = {
|
||||
let cfg = state.config.read().await;
|
||||
(
|
||||
cfg.scheduling.pipeline_enabled,
|
||||
cfg.scheduling.pipeline_interval_hours.max(1),
|
||||
)
|
||||
};
|
||||
|
||||
let sleep_secs = u64::from(hours) * 3600;
|
||||
|
||||
// Update next-run time
|
||||
{
|
||||
let mut sched = state.scheduler.lock().await;
|
||||
sched.next_pipeline = if enabled {
|
||||
Some((Utc::now() + chrono::Duration::seconds(sleep_secs as i64)).naive_utc())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
|
||||
|
||||
if !enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this run was skipped
|
||||
{
|
||||
let mut sched = state.scheduler.lock().await;
|
||||
sched.next_pipeline = None;
|
||||
if sched.skip_pipeline {
|
||||
sched.skip_pipeline = false;
|
||||
tracing::info!("scheduled pipeline skipped (user cancelled)");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("scheduled pipeline starting");
|
||||
let task_ids = crate::pipeline::run_pipeline(&state).await;
|
||||
tracing::info!(?task_ids, "scheduled pipeline complete");
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -3,12 +3,12 @@ use actix_web::{HttpResponse, web};
|
||||
use serde::Deserialize;
|
||||
|
||||
use shanty_db::entities::download_queue::DownloadStatus;
|
||||
use shanty_db::entities::work_queue::WorkTaskType;
|
||||
use shanty_db::queries;
|
||||
|
||||
use crate::auth;
|
||||
use crate::config::AppConfig;
|
||||
use crate::error::ApiError;
|
||||
use crate::routes::artists::enrich_all_watched_artists;
|
||||
use crate::state::AppState;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
@@ -38,13 +38,13 @@ async fn get_status(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_auth(&session)?;
|
||||
let summary = shanty_watch::library_summary(state.db.conn()).await?;
|
||||
let pending_items =
|
||||
queries::downloads::list(state.db.conn(), Some(DownloadStatus::Pending)).await?;
|
||||
let conn = state.db.conn();
|
||||
|
||||
let summary = shanty_watch::library_summary(conn).await?;
|
||||
let pending_items = queries::downloads::list(conn, Some(DownloadStatus::Pending)).await?;
|
||||
let downloading_items =
|
||||
queries::downloads::list(state.db.conn(), Some(DownloadStatus::Downloading)).await?;
|
||||
let failed_items =
|
||||
queries::downloads::list(state.db.conn(), Some(DownloadStatus::Failed)).await?;
|
||||
queries::downloads::list(conn, Some(DownloadStatus::Downloading)).await?;
|
||||
let failed_items = queries::downloads::list(conn, Some(DownloadStatus::Failed)).await?;
|
||||
let tasks = state.tasks.list();
|
||||
|
||||
let mut queue_items = Vec::new();
|
||||
@@ -52,15 +52,38 @@ async fn get_status(
|
||||
queue_items.extend(pending_items.iter().cloned());
|
||||
queue_items.extend(failed_items.iter().take(5).cloned());
|
||||
|
||||
let needs_tagging = queries::tracks::get_needing_metadata(state.db.conn()).await?;
|
||||
let needs_tagging = queries::tracks::get_needing_metadata(conn).await?;
|
||||
|
||||
// Scheduled task info
|
||||
let sched = state.scheduler.lock().await;
|
||||
let scheduled_tasks = serde_json::json!({
|
||||
"next_pipeline": sched.next_pipeline,
|
||||
"next_monitor": sched.next_monitor,
|
||||
});
|
||||
drop(sched);
|
||||
// Work queue counts
|
||||
let work_queue = queries::work_queue::counts_all(conn).await.ok();
|
||||
|
||||
// Scheduler state from DB
|
||||
let scheduler_jobs = queries::scheduler_state::list_all(conn).await.unwrap_or_default();
|
||||
let scheduler_json: serde_json::Value = scheduler_jobs
|
||||
.iter()
|
||||
.map(|j| {
|
||||
(
|
||||
j.job_name.clone(),
|
||||
serde_json::json!({
|
||||
"last_run": j.last_run_at,
|
||||
"next_run": j.next_run_at,
|
||||
"last_result": j.last_result,
|
||||
"enabled": j.enabled,
|
||||
}),
|
||||
)
|
||||
})
|
||||
.collect::<serde_json::Map<String, serde_json::Value>>()
|
||||
.into();
|
||||
|
||||
// Backward-compatible scheduled field (from scheduler_state DB)
|
||||
let next_pipeline = scheduler_jobs
|
||||
.iter()
|
||||
.find(|j| j.job_name == "pipeline")
|
||||
.and_then(|j| j.next_run_at);
|
||||
let next_monitor = scheduler_jobs
|
||||
.iter()
|
||||
.find(|j| j.job_name == "monitor")
|
||||
.and_then(|j| j.next_run_at);
|
||||
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({
|
||||
"library": summary,
|
||||
@@ -75,7 +98,12 @@ async fn get_status(
|
||||
"items": needs_tagging.iter().take(20).collect::<Vec<_>>(),
|
||||
},
|
||||
"tasks": tasks,
|
||||
"scheduled": scheduled_tasks,
|
||||
"scheduled": {
|
||||
"next_pipeline": next_pipeline,
|
||||
"next_monitor": next_monitor,
|
||||
},
|
||||
"work_queue": work_queue,
|
||||
"scheduler": scheduler_json,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -84,27 +112,16 @@ async fn trigger_index(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_auth(&session)?;
|
||||
let task_id = state.tasks.register("index");
|
||||
let state = state.clone();
|
||||
let tid = task_id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let cfg = state.config.read().await.clone();
|
||||
state
|
||||
.tasks
|
||||
.update_progress(&tid, 0, 0, "Scanning library...");
|
||||
let scan_config = shanty_index::ScanConfig {
|
||||
root: cfg.library_path.clone(),
|
||||
dry_run: false,
|
||||
concurrency: cfg.indexing.concurrency,
|
||||
};
|
||||
match shanty_index::run_scan(state.db.conn(), &scan_config).await {
|
||||
Ok(stats) => state.tasks.complete(&tid, format!("{stats}")),
|
||||
Err(e) => state.tasks.fail(&tid, e.to_string()),
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
|
||||
let payload = serde_json::json!({"scan_all": true});
|
||||
let item = queries::work_queue::enqueue(
|
||||
state.db.conn(),
|
||||
WorkTaskType::Index,
|
||||
&payload.to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
state.workers.notify(WorkTaskType::Index);
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "work_item_id": item.id })))
|
||||
}
|
||||
|
||||
async fn trigger_tag(
|
||||
@@ -112,35 +129,18 @@ async fn trigger_tag(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_auth(&session)?;
|
||||
let task_id = state.tasks.register("tag");
|
||||
let state = state.clone();
|
||||
let tid = task_id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let cfg = state.config.read().await.clone();
|
||||
state
|
||||
.tasks
|
||||
.update_progress(&tid, 0, 0, "Preparing tagger...");
|
||||
let mb = match shanty_tag::MusicBrainzClient::new() {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
state.tasks.fail(&tid, e.to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
let tag_config = shanty_tag::TagConfig {
|
||||
dry_run: false,
|
||||
write_tags: cfg.tagging.write_tags,
|
||||
confidence: cfg.tagging.confidence,
|
||||
};
|
||||
state.tasks.update_progress(&tid, 0, 0, "Tagging tracks...");
|
||||
match shanty_tag::run_tagging(state.db.conn(), &mb, &tag_config, None).await {
|
||||
Ok(stats) => state.tasks.complete(&tid, format!("{stats}")),
|
||||
Err(e) => state.tasks.fail(&tid, e.to_string()),
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
|
||||
let conn = state.db.conn();
|
||||
let untagged = queries::tracks::get_needing_metadata(conn).await?;
|
||||
let mut count = 0;
|
||||
for track in &untagged {
|
||||
let payload = serde_json::json!({"track_id": track.id});
|
||||
queries::work_queue::enqueue(conn, WorkTaskType::Tag, &payload.to_string(), None).await?;
|
||||
count += 1;
|
||||
}
|
||||
if count > 0 {
|
||||
state.workers.notify(WorkTaskType::Tag);
|
||||
}
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "enqueued": count })))
|
||||
}
|
||||
|
||||
async fn trigger_organize(
|
||||
@@ -148,39 +148,31 @@ async fn trigger_organize(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_auth(&session)?;
|
||||
let task_id = state.tasks.register("organize");
|
||||
let state = state.clone();
|
||||
let tid = task_id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let cfg = state.config.read().await.clone();
|
||||
state
|
||||
.tasks
|
||||
.update_progress(&tid, 0, 0, "Organizing files...");
|
||||
let org_config = shanty_org::OrgConfig {
|
||||
target_dir: cfg.library_path.clone(),
|
||||
format: cfg.organization_format.clone(),
|
||||
dry_run: false,
|
||||
copy: false,
|
||||
};
|
||||
match shanty_org::organize_from_db(state.db.conn(), &org_config).await {
|
||||
Ok(stats) => {
|
||||
let promoted = queries::wanted::promote_downloaded_to_owned(state.db.conn())
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
let msg = if promoted > 0 {
|
||||
format!("{stats} — {promoted} items marked as owned")
|
||||
} else {
|
||||
format!("{stats}")
|
||||
};
|
||||
state.tasks.complete(&tid, msg);
|
||||
let _ = enrich_all_watched_artists(&state).await;
|
||||
}
|
||||
Err(e) => state.tasks.fail(&tid, e.to_string()),
|
||||
let conn = state.db.conn();
|
||||
let mut count = 0u64;
|
||||
let mut offset = 0u64;
|
||||
loop {
|
||||
let tracks = queries::tracks::list(conn, 500, offset).await?;
|
||||
if tracks.is_empty() {
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_id": task_id })))
|
||||
for track in &tracks {
|
||||
let payload = serde_json::json!({"track_id": track.id});
|
||||
queries::work_queue::enqueue(
|
||||
conn,
|
||||
WorkTaskType::Organize,
|
||||
&payload.to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
count += 1;
|
||||
}
|
||||
offset += 500;
|
||||
}
|
||||
if count > 0 {
|
||||
state.workers.notify(WorkTaskType::Organize);
|
||||
}
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "enqueued": count })))
|
||||
}
|
||||
|
||||
async fn trigger_pipeline(
|
||||
@@ -188,8 +180,8 @@ async fn trigger_pipeline(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_auth(&session)?;
|
||||
let task_ids = crate::pipeline::spawn_pipeline(&state);
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "task_ids": task_ids })))
|
||||
let pipeline_id = crate::pipeline::trigger_pipeline(&state).await?;
|
||||
Ok(HttpResponse::Accepted().json(serde_json::json!({ "pipeline_id": pipeline_id })))
|
||||
}
|
||||
|
||||
async fn get_task(
|
||||
@@ -313,10 +305,13 @@ async fn skip_pipeline(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_admin(&session)?;
|
||||
let mut sched = state.scheduler.lock().await;
|
||||
sched.skip_pipeline = true;
|
||||
sched.next_pipeline = None;
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped"})))
|
||||
// Push next_run_at forward by one interval
|
||||
let cfg = state.config.read().await;
|
||||
let hours = cfg.scheduling.pipeline_interval_hours.max(1);
|
||||
drop(cfg);
|
||||
let next = chrono::Utc::now().naive_utc() + chrono::Duration::hours(i64::from(hours));
|
||||
queries::scheduler_state::update_next_run(state.db.conn(), "pipeline", Some(next)).await?;
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped", "next_run": next})))
|
||||
}
|
||||
|
||||
async fn skip_monitor(
|
||||
@@ -324,10 +319,12 @@ async fn skip_monitor(
|
||||
session: Session,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
auth::require_admin(&session)?;
|
||||
let mut sched = state.scheduler.lock().await;
|
||||
sched.skip_monitor = true;
|
||||
sched.next_monitor = None;
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped"})))
|
||||
let cfg = state.config.read().await;
|
||||
let hours = cfg.scheduling.monitor_interval_hours.max(1);
|
||||
drop(cfg);
|
||||
let next = chrono::Utc::now().naive_utc() + chrono::Duration::hours(i64::from(hours));
|
||||
queries::scheduler_state::update_next_run(state.db.conn(), "monitor", Some(next)).await?;
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "skipped", "next_run": next})))
|
||||
}
|
||||
|
||||
async fn get_mb_status(
|
||||
@@ -389,7 +386,7 @@ async fn trigger_mb_import(
|
||||
state.tasks.update_progress(
|
||||
&tid,
|
||||
i as u64,
|
||||
4 + 4, // 4 downloads + 4 imports
|
||||
4 + 4,
|
||||
&format!("Downloading {filename}..."),
|
||||
);
|
||||
if let Err(e) =
|
||||
@@ -413,7 +410,6 @@ async fn trigger_mb_import(
|
||||
|
||||
let tid_clone = tid.clone();
|
||||
let state_clone = state.clone();
|
||||
// Run import in blocking task since rusqlite is sync
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
shanty_data::mb_import::run_import_at_path(&db_path, &data_dir, |msg| {
|
||||
tracing::info!("{msg}");
|
||||
|
||||
148
src/scheduler.rs
Normal file
148
src/scheduler.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
//! Unified scheduler that manages all recurring background jobs.
|
||||
//!
|
||||
//! Replaces the separate pipeline_scheduler, monitor, and cookie_refresh spawn loops
|
||||
//! with a single loop backed by persistent state in the `scheduler_state` DB table.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::web;
|
||||
use chrono::Utc;
|
||||
|
||||
use shanty_db::queries;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Spawn the unified scheduler background loop.
|
||||
pub fn spawn(state: web::Data<AppState>) {
|
||||
tokio::spawn(async move {
|
||||
// Initialize scheduler state rows in DB
|
||||
for job_name in ["pipeline", "monitor", "cookie_refresh"] {
|
||||
if let Err(e) =
|
||||
queries::scheduler_state::get_or_create(state.db.conn(), job_name).await
|
||||
{
|
||||
tracing::error!(job = job_name, error = %e, "failed to init scheduler state");
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
// Check each job
|
||||
check_and_run_job(&state, "pipeline", run_pipeline_job).await;
|
||||
check_and_run_job(&state, "monitor", run_monitor_job).await;
|
||||
check_and_run_job(&state, "cookie_refresh", run_cookie_refresh_job).await;
|
||||
|
||||
// Poll every 30 seconds
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn check_and_run_job<F, Fut>(state: &web::Data<AppState>, job_name: &str, run_fn: F)
|
||||
where
|
||||
F: FnOnce(web::Data<AppState>) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<String, String>>,
|
||||
{
|
||||
let job = match queries::scheduler_state::get_or_create(state.db.conn(), job_name).await {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
tracing::error!(job = job_name, error = %e, "failed to read scheduler state");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Read config to check if enabled and get interval
|
||||
let (config_enabled, interval_secs) = read_job_config(state, job_name).await;
|
||||
|
||||
// If config says disabled, ensure DB state reflects it
|
||||
if !config_enabled {
|
||||
if job.enabled {
|
||||
let _ =
|
||||
queries::scheduler_state::set_enabled(state.db.conn(), job_name, false).await;
|
||||
let _ =
|
||||
queries::scheduler_state::update_next_run(state.db.conn(), job_name, None).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// If DB says disabled (e.g. user skipped), re-enable from config
|
||||
if !job.enabled {
|
||||
let _ = queries::scheduler_state::set_enabled(state.db.conn(), job_name, true).await;
|
||||
}
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
|
||||
// If no next_run_at is set, schedule one
|
||||
if job.next_run_at.is_none() {
|
||||
let next = now + chrono::Duration::seconds(interval_secs);
|
||||
let _ =
|
||||
queries::scheduler_state::update_next_run(state.db.conn(), job_name, Some(next)).await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if it's time to run
|
||||
let next_run = job.next_run_at.unwrap();
|
||||
if now < next_run {
|
||||
return;
|
||||
}
|
||||
|
||||
// Time to run!
|
||||
tracing::info!(job = job_name, "scheduled job starting");
|
||||
|
||||
let result = run_fn(state.clone()).await;
|
||||
|
||||
let result_str = match &result {
|
||||
Ok(msg) => {
|
||||
tracing::info!(job = job_name, result = %msg, "scheduled job complete");
|
||||
format!("ok: {msg}")
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(job = job_name, error = %e, "scheduled job failed");
|
||||
format!("error: {e}")
|
||||
}
|
||||
};
|
||||
|
||||
// Update last run and schedule next
|
||||
let _ = queries::scheduler_state::update_last_run(state.db.conn(), job_name, &result_str).await;
|
||||
let next = Utc::now().naive_utc() + chrono::Duration::seconds(interval_secs);
|
||||
let _ =
|
||||
queries::scheduler_state::update_next_run(state.db.conn(), job_name, Some(next)).await;
|
||||
}
|
||||
|
||||
async fn read_job_config(state: &web::Data<AppState>, job_name: &str) -> (bool, i64) {
|
||||
let cfg = state.config.read().await;
|
||||
match job_name {
|
||||
"pipeline" => (
|
||||
cfg.scheduling.pipeline_enabled,
|
||||
i64::from(cfg.scheduling.pipeline_interval_hours.max(1)) * 3600,
|
||||
),
|
||||
"monitor" => (
|
||||
cfg.scheduling.monitor_enabled,
|
||||
i64::from(cfg.scheduling.monitor_interval_hours.max(1)) * 3600,
|
||||
),
|
||||
"cookie_refresh" => (
|
||||
cfg.download.cookie_refresh_enabled,
|
||||
i64::from(cfg.download.cookie_refresh_hours.max(1)) * 3600,
|
||||
),
|
||||
_ => (false, 3600),
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_pipeline_job(state: web::Data<AppState>) -> Result<String, String> {
|
||||
let pipeline_id = crate::pipeline::trigger_pipeline(&state)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
Ok(format!("pipeline triggered: {pipeline_id}"))
|
||||
}
|
||||
|
||||
async fn run_monitor_job(state: web::Data<AppState>) -> Result<String, String> {
|
||||
let stats = crate::monitor::check_monitored_artists(&state)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
Ok(format!(
|
||||
"{} artists checked, {} new releases, {} tracks added",
|
||||
stats.artists_checked, stats.new_releases_found, stats.tracks_added
|
||||
))
|
||||
}
|
||||
|
||||
async fn run_cookie_refresh_job(_state: web::Data<AppState>) -> Result<String, String> {
|
||||
crate::cookie_refresh::run_refresh().await
|
||||
}
|
||||
15
src/state.rs
15
src/state.rs
@@ -8,24 +8,13 @@ use shanty_search::MusicBrainzSearch;
|
||||
|
||||
use crate::config::AppConfig;
|
||||
use crate::tasks::TaskManager;
|
||||
use crate::workers::WorkerManager;
|
||||
|
||||
/// Tracks an active Firefox login session for YouTube auth.
|
||||
pub struct FirefoxLoginSession {
|
||||
pub vnc_url: String,
|
||||
}
|
||||
|
||||
/// Tracks next-run times for scheduled background tasks.
|
||||
pub struct SchedulerInfo {
|
||||
/// When the next pipeline run is scheduled (None if disabled).
|
||||
pub next_pipeline: Option<chrono::NaiveDateTime>,
|
||||
/// When the next monitor check is scheduled (None if disabled).
|
||||
pub next_monitor: Option<chrono::NaiveDateTime>,
|
||||
/// Skip the next pipeline run (one-shot, resets after skip).
|
||||
pub skip_pipeline: bool,
|
||||
/// Skip the next monitor run (one-shot, resets after skip).
|
||||
pub skip_monitor: bool,
|
||||
}
|
||||
|
||||
pub struct AppState {
|
||||
pub db: Database,
|
||||
pub mb_client: HybridMusicBrainzFetcher,
|
||||
@@ -34,6 +23,6 @@ pub struct AppState {
|
||||
pub config: Arc<RwLock<AppConfig>>,
|
||||
pub config_path: Option<String>,
|
||||
pub tasks: TaskManager,
|
||||
pub workers: WorkerManager,
|
||||
pub firefox_login: Mutex<Option<FirefoxLoginSession>>,
|
||||
pub scheduler: Mutex<SchedulerInfo>,
|
||||
}
|
||||
|
||||
431
src/workers.rs
Normal file
431
src/workers.rs
Normal file
@@ -0,0 +1,431 @@
|
||||
//! Work queue workers that process pipeline items concurrently.
|
||||
//!
|
||||
//! Each task type (Download, Index, Tag, Organize) has a dedicated worker loop
|
||||
//! that polls the work_queue table and processes items with bounded concurrency.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use actix_web::web;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use tokio::sync::{Notify, Semaphore};
|
||||
|
||||
use shanty_db::entities::download_queue::DownloadStatus;
|
||||
use shanty_db::entities::wanted_item::WantedStatus;
|
||||
use shanty_db::entities::work_queue::WorkTaskType;
|
||||
use shanty_db::queries;
|
||||
use shanty_dl::DownloadBackend;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Manages worker notification channels and spawns worker loops.
|
||||
pub struct WorkerManager {
|
||||
notifiers: HashMap<WorkTaskType, Arc<Notify>>,
|
||||
}
|
||||
|
||||
impl Default for WorkerManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkerManager {
|
||||
pub fn new() -> Self {
|
||||
let mut notifiers = HashMap::new();
|
||||
notifiers.insert(WorkTaskType::Download, Arc::new(Notify::new()));
|
||||
notifiers.insert(WorkTaskType::Index, Arc::new(Notify::new()));
|
||||
notifiers.insert(WorkTaskType::Tag, Arc::new(Notify::new()));
|
||||
notifiers.insert(WorkTaskType::Organize, Arc::new(Notify::new()));
|
||||
Self { notifiers }
|
||||
}
|
||||
|
||||
/// Wake the worker for a specific task type.
|
||||
pub fn notify(&self, task_type: WorkTaskType) {
|
||||
if let Some(n) = self.notifiers.get(&task_type) {
|
||||
n.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn all worker loops and run startup recovery.
|
||||
pub fn spawn_all(state: web::Data<AppState>) {
|
||||
let state_clone = state.clone();
|
||||
tokio::spawn(async move {
|
||||
// Reset any items stuck in Running from a previous crash
|
||||
match queries::work_queue::reset_stale_running(state_clone.db.conn()).await {
|
||||
Ok(count) if count > 0 => {
|
||||
tracing::info!(count, "reset stale running work queue items");
|
||||
}
|
||||
Err(e) => tracing::error!(error = %e, "failed to reset stale work queue items"),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Periodic cleanup of old completed items (every 6 hours)
|
||||
let cleanup_state = state_clone.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(6 * 3600)).await;
|
||||
let _ = queries::work_queue::cleanup_completed(cleanup_state.db.conn(), 7).await;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Spawn each worker type
|
||||
spawn_worker(state.clone(), WorkTaskType::Download, 1);
|
||||
spawn_worker(state.clone(), WorkTaskType::Index, 4);
|
||||
spawn_worker(state.clone(), WorkTaskType::Tag, 2);
|
||||
spawn_worker(state.clone(), WorkTaskType::Organize, 4);
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_worker(state: web::Data<AppState>, task_type: WorkTaskType, concurrency: usize) {
|
||||
let notify = state
|
||||
.workers
|
||||
.notifiers
|
||||
.get(&task_type)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| Arc::new(Notify::new()));
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
// Wait for notification or poll timeout
|
||||
tokio::select! {
|
||||
_ = notify.notified() => {}
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
|
||||
}
|
||||
|
||||
// Claim pending items
|
||||
let items = match queries::work_queue::claim_next(
|
||||
state.db.conn(),
|
||||
task_type,
|
||||
concurrency as u64,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(items) => items,
|
||||
Err(e) => {
|
||||
tracing::error!(task_type = %task_type, error = %e, "failed to claim work items");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if items.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(concurrency));
|
||||
let mut handles = Vec::new();
|
||||
|
||||
for item in items {
|
||||
let permit = semaphore.clone().acquire_owned().await.unwrap();
|
||||
let state = state.clone();
|
||||
let item_id = item.id;
|
||||
let item_task_type = item.task_type;
|
||||
let pipeline_id = item.pipeline_id.clone();
|
||||
|
||||
handles.push(tokio::spawn(async move {
|
||||
let _permit = permit;
|
||||
|
||||
let result = match item_task_type {
|
||||
WorkTaskType::Download => process_download(&state, &item).await,
|
||||
WorkTaskType::Index => process_index(&state, &item).await,
|
||||
WorkTaskType::Tag => process_tag(&state, &item).await,
|
||||
WorkTaskType::Organize => process_organize(&state, &item).await,
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(downstream) => {
|
||||
if let Err(e) =
|
||||
queries::work_queue::complete(state.db.conn(), item_id).await
|
||||
{
|
||||
tracing::error!(id = item_id, error = %e, "failed to mark work item complete");
|
||||
}
|
||||
// Enqueue downstream items
|
||||
for (task_type, payload) in downstream {
|
||||
if let Err(e) = queries::work_queue::enqueue(
|
||||
state.db.conn(),
|
||||
task_type,
|
||||
&payload,
|
||||
pipeline_id.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
error = %e,
|
||||
"failed to enqueue downstream work item"
|
||||
);
|
||||
}
|
||||
state.workers.notify(task_type);
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::warn!(
|
||||
id = item_id,
|
||||
task_type = %item_task_type,
|
||||
error = %error,
|
||||
"work item failed"
|
||||
);
|
||||
if let Err(e) =
|
||||
queries::work_queue::fail(state.db.conn(), item_id, &error).await
|
||||
{
|
||||
tracing::error!(id = item_id, error = %e, "failed to mark work item failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// --- Worker implementations ---
|
||||
|
||||
type WorkResult = Result<Vec<(WorkTaskType, String)>, String>;
|
||||
|
||||
async fn process_download(
|
||||
state: &web::Data<AppState>,
|
||||
item: &shanty_db::entities::work_queue::Model,
|
||||
) -> WorkResult {
|
||||
let payload: serde_json::Value =
|
||||
serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?;
|
||||
let dl_queue_id = payload
|
||||
.get("download_queue_id")
|
||||
.and_then(|v| v.as_i64())
|
||||
.map(|v| v as i32);
|
||||
|
||||
let conn = state.db.conn();
|
||||
|
||||
// Get the download queue item — either specific ID or next pending
|
||||
let dl_item = if let Some(id) = dl_queue_id {
|
||||
queries::downloads::list(conn, None)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
.into_iter()
|
||||
.find(|i| i.id == id)
|
||||
} else {
|
||||
queries::downloads::get_next_pending(conn)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
};
|
||||
|
||||
let dl_item = match dl_item {
|
||||
Some(item) => item,
|
||||
None => return Ok(vec![]), // Nothing to download
|
||||
};
|
||||
|
||||
// Mark as downloading
|
||||
queries::downloads::update_status(conn, dl_item.id, DownloadStatus::Downloading, None)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Build download backend from config
|
||||
let cfg = state.config.read().await.clone();
|
||||
let cookies = cfg.download.cookies_path.clone();
|
||||
let format: shanty_dl::AudioFormat = cfg
|
||||
.download
|
||||
.format
|
||||
.parse()
|
||||
.unwrap_or(shanty_dl::AudioFormat::Opus);
|
||||
let source: shanty_dl::SearchSource = cfg
|
||||
.download
|
||||
.search_source
|
||||
.parse()
|
||||
.unwrap_or(shanty_dl::SearchSource::YouTubeMusic);
|
||||
let rate = if cookies.is_some() {
|
||||
cfg.download.rate_limit_auth
|
||||
} else {
|
||||
cfg.download.rate_limit
|
||||
};
|
||||
|
||||
let backend = shanty_dl::YtDlpBackend::new(rate, source, cookies.clone());
|
||||
let backend_config = shanty_dl::BackendConfig {
|
||||
output_dir: cfg.download_path.clone(),
|
||||
format,
|
||||
cookies_path: cookies,
|
||||
};
|
||||
|
||||
// Determine download target
|
||||
let target = if let Some(ref url) = dl_item.source_url {
|
||||
shanty_dl::DownloadTarget::Url(url.clone())
|
||||
} else {
|
||||
shanty_dl::DownloadTarget::Query(dl_item.query.clone())
|
||||
};
|
||||
|
||||
// Execute download
|
||||
let result = backend
|
||||
.download(&target, &backend_config)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Mark download_queue entry as completed
|
||||
queries::downloads::update_status(conn, dl_item.id, DownloadStatus::Completed, None)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Create track record if linked to a wanted item (same logic as shanty-dl queue.rs)
|
||||
let mut downstream = Vec::new();
|
||||
if let Some(wanted_item_id) = dl_item.wanted_item_id {
|
||||
let wanted = queries::wanted::get_by_id(conn, wanted_item_id)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let file_size = std::fs::metadata(&result.file_path)
|
||||
.map(|m| m.len() as i64)
|
||||
.unwrap_or(0);
|
||||
|
||||
let now = chrono::Utc::now().naive_utc();
|
||||
let track_active = shanty_db::entities::track::ActiveModel {
|
||||
file_path: Set(result.file_path.to_string_lossy().to_string()),
|
||||
title: Set(Some(result.title.clone())),
|
||||
artist: Set(result.artist.clone()),
|
||||
file_size: Set(file_size),
|
||||
musicbrainz_id: Set(wanted.musicbrainz_id.clone()),
|
||||
artist_id: Set(wanted.artist_id),
|
||||
added_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
..Default::default()
|
||||
};
|
||||
let track = queries::tracks::upsert(conn, track_active)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
queries::wanted::update_status(conn, wanted_item_id, WantedStatus::Downloaded)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Create Tag work item for this track
|
||||
let tag_payload = serde_json::json!({"track_id": track.id});
|
||||
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
|
||||
}
|
||||
|
||||
let _ = queries::cache::purge_prefix(conn, "artist_totals:").await;
|
||||
Ok(downstream)
|
||||
}
|
||||
|
||||
async fn process_index(
|
||||
state: &web::Data<AppState>,
|
||||
item: &shanty_db::entities::work_queue::Model,
|
||||
) -> WorkResult {
|
||||
let payload: serde_json::Value =
|
||||
serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?;
|
||||
let conn = state.db.conn();
|
||||
let cfg = state.config.read().await.clone();
|
||||
let mut downstream = Vec::new();
|
||||
|
||||
if payload.get("scan_all").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||
// Full library scan
|
||||
let scan_config = shanty_index::ScanConfig {
|
||||
root: cfg.library_path.clone(),
|
||||
dry_run: false,
|
||||
concurrency: cfg.indexing.concurrency,
|
||||
};
|
||||
shanty_index::run_scan(conn, &scan_config)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Create Tag work items for all untagged tracks
|
||||
let untagged = queries::tracks::get_needing_metadata(conn)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
for track in &untagged {
|
||||
let tag_payload = serde_json::json!({"track_id": track.id});
|
||||
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
|
||||
}
|
||||
} else if let Some(file_path) = payload.get("file_path").and_then(|v| v.as_str()) {
|
||||
// Single file index
|
||||
let path = std::path::PathBuf::from(file_path);
|
||||
let track_id = shanty_index::index_file(conn, &path, false)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
if let Some(id) = track_id {
|
||||
let tag_payload = serde_json::json!({"track_id": id});
|
||||
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(downstream)
|
||||
}
|
||||
|
||||
async fn process_tag(
|
||||
state: &web::Data<AppState>,
|
||||
item: &shanty_db::entities::work_queue::Model,
|
||||
) -> WorkResult {
|
||||
let payload: serde_json::Value =
|
||||
serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?;
|
||||
let track_id = payload
|
||||
.get("track_id")
|
||||
.and_then(|v| v.as_i64())
|
||||
.ok_or("missing track_id in payload")?
|
||||
as i32;
|
||||
|
||||
let conn = state.db.conn();
|
||||
let cfg = state.config.read().await.clone();
|
||||
|
||||
let track = queries::tracks::get_by_id(conn, track_id)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let tag_config = shanty_tag::TagConfig {
|
||||
dry_run: false,
|
||||
write_tags: cfg.tagging.write_tags,
|
||||
confidence: cfg.tagging.confidence,
|
||||
};
|
||||
|
||||
shanty_tag::tag_track(conn, &state.mb_client, &track, &tag_config)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Create Organize work item
|
||||
let org_payload = serde_json::json!({"track_id": track_id});
|
||||
Ok(vec![(WorkTaskType::Organize, org_payload.to_string())])
|
||||
}
|
||||
|
||||
async fn process_organize(
|
||||
state: &web::Data<AppState>,
|
||||
item: &shanty_db::entities::work_queue::Model,
|
||||
) -> WorkResult {
|
||||
let payload: serde_json::Value =
|
||||
serde_json::from_str(&item.payload_json).map_err(|e| e.to_string())?;
|
||||
let track_id = payload
|
||||
.get("track_id")
|
||||
.and_then(|v| v.as_i64())
|
||||
.ok_or("missing track_id in payload")?
|
||||
as i32;
|
||||
|
||||
let conn = state.db.conn();
|
||||
let cfg = state.config.read().await.clone();
|
||||
|
||||
let org_config = shanty_org::OrgConfig {
|
||||
target_dir: cfg.library_path.clone(),
|
||||
format: cfg.organization_format.clone(),
|
||||
dry_run: false,
|
||||
copy: false,
|
||||
};
|
||||
|
||||
shanty_org::organize_track(conn, track_id, &org_config)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Promote this track's wanted item from Downloaded to Owned
|
||||
let _ = queries::wanted::promote_downloaded_to_owned(conn).await;
|
||||
|
||||
// Check if pipeline is complete and trigger enrichment
|
||||
if let Some(ref pipeline_id) = item.pipeline_id
|
||||
&& let Ok(true) = queries::work_queue::pipeline_is_complete(conn, pipeline_id).await
|
||||
{
|
||||
tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, triggering enrichment");
|
||||
let state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = crate::routes::artists::enrich_all_watched_artists(&state).await {
|
||||
tracing::error!(error = %e, "post-pipeline enrichment failed");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(vec![])
|
||||
}
|
||||
Reference in New Issue
Block a user