Files
dl/src/queue.rs
2026-03-18 14:34:26 -04:00

339 lines
11 KiB
Rust

use std::fmt;
use std::time::Duration;
use sea_orm::{ActiveValue::Set, DatabaseConnection};
use shanty_db::entities::download_queue::DownloadStatus;
use shanty_db::entities::wanted_item::WantedStatus;
use shanty_db::queries;
use crate::backend::{BackendConfig, DownloadBackend, DownloadTarget};
use crate::error::{DlError, DlResult};
/// Statistics from a queue processing run.
#[derive(Debug, Default, Clone)]
pub struct DlStats {
pub downloads_attempted: u64,
pub downloads_completed: u64,
pub downloads_failed: u64,
pub downloads_skipped: u64,
}
impl fmt::Display for DlStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"attempted: {}, completed: {}, failed: {}, skipped: {}",
self.downloads_attempted,
self.downloads_completed,
self.downloads_failed,
self.downloads_skipped,
)
}
}
const MAX_RETRIES: i32 = 3;
const RETRY_DELAYS: [Duration; 3] = [
Duration::from_secs(30),
Duration::from_secs(120),
Duration::from_secs(600),
];
/// Progress callback: (current, total, message).
pub type ProgressFn = Box<dyn Fn(u64, u64, &str) + Send + Sync>;
/// Process all pending items in the download queue.
pub async fn run_queue(
conn: &DatabaseConnection,
backend: &impl DownloadBackend,
config: &BackendConfig,
dry_run: bool,
) -> DlResult<DlStats> {
run_queue_with_progress(conn, backend, config, dry_run, None).await
}
/// Process all pending items in the download queue, with optional progress reporting.
pub async fn run_queue_with_progress(
conn: &DatabaseConnection,
backend: &impl DownloadBackend,
config: &BackendConfig,
dry_run: bool,
on_progress: Option<ProgressFn>,
) -> DlResult<DlStats> {
let mut stats = DlStats::default();
// Count total for progress reporting
let total = queries::downloads::list(conn, Some(DownloadStatus::Pending))
.await
.map(|v| v.len() as u64)
.unwrap_or(0);
if let Some(ref cb) = on_progress {
cb(0, total, "Starting downloads...");
}
loop {
let item = match queries::downloads::get_next_pending(conn).await? {
Some(item) => item,
None => break,
};
stats.downloads_attempted += 1;
if let Some(ref cb) = on_progress {
cb(stats.downloads_attempted, total, &format!("Downloading: {}", item.query));
}
tracing::info!(
id = item.id,
query = %item.query,
retry = item.retry_count,
"processing download"
);
if dry_run {
tracing::info!(id = item.id, query = %item.query, "DRY RUN: would download");
stats.downloads_skipped += 1;
// Mark as failed temporarily so we don't loop forever on the same item
queries::downloads::update_status(
conn,
item.id,
DownloadStatus::Failed,
Some("dry run"),
)
.await?;
continue;
}
// Mark as downloading
queries::downloads::update_status(conn, item.id, DownloadStatus::Downloading, None)
.await?;
// Determine download target
let target = if let Some(ref url) = item.source_url {
DownloadTarget::Url(url.clone())
} else {
DownloadTarget::Query(item.query.clone())
};
// Attempt download
match backend.download(&target, config).await {
Ok(result) => {
tracing::info!(
id = item.id,
path = %result.file_path.display(),
title = %result.title,
"download completed"
);
queries::downloads::update_status(
conn,
item.id,
DownloadStatus::Completed,
None,
)
.await?;
// Update wanted item status and create track record with MBID
if let Some(wanted_id) = item.wanted_item_id {
if let Ok(wanted) = queries::wanted::get_by_id(conn, wanted_id).await {
// Create a track record with the MBID so the tagger
// can skip searching and go straight to the right recording
let now = chrono::Utc::now().naive_utc();
let file_path = result.file_path.to_string_lossy().to_string();
let file_size = std::fs::metadata(&result.file_path)
.map(|m| m.len() as i64)
.unwrap_or(0);
let track_active = shanty_db::entities::track::ActiveModel {
file_path: Set(file_path),
title: Set(Some(result.title.clone())),
artist: Set(result.artist.clone()),
file_size: Set(file_size),
musicbrainz_id: Set(wanted.musicbrainz_id.clone()),
artist_id: Set(wanted.artist_id),
added_at: Set(now),
updated_at: Set(now),
..Default::default()
};
if let Err(e) = queries::tracks::upsert(conn, track_active).await {
tracing::warn!(error = %e, "failed to create track record after download");
}
if let Err(e) = queries::wanted::update_status(
conn,
wanted_id,
WantedStatus::Downloaded,
)
.await
{
tracing::warn!(
wanted_id = wanted_id,
error = %e,
"failed to update wanted item status"
);
}
}
}
stats.downloads_completed += 1;
}
Err(e) => {
let error_msg = e.to_string();
tracing::error!(
id = item.id,
query = %item.query,
error = %error_msg,
"download failed"
);
queries::downloads::update_status(
conn,
item.id,
DownloadStatus::Failed,
Some(&error_msg),
)
.await?;
// Auto-retry transient errors
if e.is_transient() && item.retry_count < MAX_RETRIES {
let delay_idx = item.retry_count.min(RETRY_DELAYS.len() as i32 - 1) as usize;
let delay = RETRY_DELAYS[delay_idx];
tracing::info!(
id = item.id,
retry = item.retry_count + 1,
delay_secs = delay.as_secs(),
"scheduling retry"
);
queries::downloads::retry_failed(conn, item.id).await?;
// If rate limited, pause before continuing
if matches!(e, DlError::RateLimited(_)) {
tracing::warn!("rate limited — pausing queue processing");
tokio::time::sleep(delay).await;
}
}
stats.downloads_failed += 1;
}
}
}
tracing::info!(%stats, "queue processing complete");
Ok(stats)
}
/// Sync stats from a queue sync operation.
#[derive(Debug, Default)]
pub struct SyncStats {
pub found: u64,
pub enqueued: u64,
pub skipped: u64,
}
impl fmt::Display for SyncStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"found: {}, enqueued: {}, skipped (already queued): {}",
self.found, self.enqueued, self.skipped,
)
}
}
/// Sync wanted items to the download queue.
/// Finds all Track-type Wanted items and enqueues them for download,
/// skipping any that already have a queue entry.
pub async fn sync_wanted_to_queue(
conn: &DatabaseConnection,
dry_run: bool,
) -> DlResult<SyncStats> {
let wanted = queries::wanted::list(conn, Some(WantedStatus::Wanted)).await?;
let mut stats = SyncStats::default();
for item in &wanted {
stats.found += 1;
// Build search query from the name + artist
let artist_name = if let Some(id) = item.artist_id {
queries::artists::get_by_id(conn, id)
.await
.map(|a| a.name)
.ok()
} else {
None
};
let query = match artist_name {
Some(ref artist) if !item.name.is_empty() => format!("{} {}", artist, item.name),
_ if !item.name.is_empty() => item.name.clone(),
Some(ref artist) => artist.clone(),
None => {
tracing::warn!(id = item.id, "cannot build query — no name or artist");
continue;
}
};
// Check if already queued
if let Some(_existing) = queries::downloads::find_by_wanted_item_id(conn, item.id).await? {
tracing::debug!(id = item.id, name = %item.name, "already in queue, skipping");
stats.skipped += 1;
continue;
}
if dry_run {
println!("Would enqueue: {query}");
stats.enqueued += 1;
continue;
}
queries::downloads::enqueue(conn, &query, Some(item.id), "ytdlp").await?;
tracing::info!(id = item.id, query = %query, "enqueued for download");
stats.enqueued += 1;
}
tracing::info!(%stats, "sync complete");
Ok(stats)
}
/// Download a single item directly (not from queue).
pub async fn download_single(
backend: &impl DownloadBackend,
target: DownloadTarget,
config: &BackendConfig,
dry_run: bool,
) -> DlResult<()> {
if dry_run {
match &target {
DownloadTarget::Url(url) => {
tracing::info!(url = %url, "DRY RUN: would download URL");
}
DownloadTarget::Query(q) => {
tracing::info!(query = %q, "DRY RUN: would search and download");
// Still run the search to show what would be downloaded
let results = backend.search(q).await?;
if let Some(best) = results.first() {
tracing::info!(
title = %best.title,
artist = ?best.artist,
url = %best.url,
"would download"
);
} else {
tracing::warn!(query = %q, "no results found");
}
}
}
return Ok(());
}
let result = backend.download(&target, config).await?;
println!(
"Downloaded: {}{}",
result.title,
result.file_path.display()
);
Ok(())
}