Initial commit
This commit is contained in:
212
src/queue.rs
Normal file
212
src/queue.rs
Normal file
@@ -0,0 +1,212 @@
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use sea_orm::DatabaseConnection;
|
||||
|
||||
use shanty_db::entities::download_queue::DownloadStatus;
|
||||
use shanty_db::entities::wanted_item::WantedStatus;
|
||||
use shanty_db::queries;
|
||||
|
||||
use crate::backend::{BackendConfig, DownloadBackend, DownloadTarget};
|
||||
use crate::error::{DlError, DlResult};
|
||||
|
||||
/// Statistics from a queue processing run.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct DlStats {
|
||||
pub downloads_attempted: u64,
|
||||
pub downloads_completed: u64,
|
||||
pub downloads_failed: u64,
|
||||
pub downloads_skipped: u64,
|
||||
}
|
||||
|
||||
impl fmt::Display for DlStats {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"attempted: {}, completed: {}, failed: {}, skipped: {}",
|
||||
self.downloads_attempted,
|
||||
self.downloads_completed,
|
||||
self.downloads_failed,
|
||||
self.downloads_skipped,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const MAX_RETRIES: i32 = 3;
|
||||
const RETRY_DELAYS: [Duration; 3] = [
|
||||
Duration::from_secs(30),
|
||||
Duration::from_secs(120),
|
||||
Duration::from_secs(600),
|
||||
];
|
||||
|
||||
/// Process all pending items in the download queue.
|
||||
pub async fn run_queue(
|
||||
conn: &DatabaseConnection,
|
||||
backend: &impl DownloadBackend,
|
||||
config: &BackendConfig,
|
||||
dry_run: bool,
|
||||
) -> DlResult<DlStats> {
|
||||
let mut stats = DlStats::default();
|
||||
|
||||
loop {
|
||||
let item = match queries::downloads::get_next_pending(conn).await? {
|
||||
Some(item) => item,
|
||||
None => break,
|
||||
};
|
||||
|
||||
stats.downloads_attempted += 1;
|
||||
|
||||
tracing::info!(
|
||||
id = item.id,
|
||||
query = %item.query,
|
||||
retry = item.retry_count,
|
||||
"processing download"
|
||||
);
|
||||
|
||||
if dry_run {
|
||||
tracing::info!(id = item.id, query = %item.query, "DRY RUN: would download");
|
||||
stats.downloads_skipped += 1;
|
||||
// Mark as failed temporarily so we don't loop forever on the same item
|
||||
queries::downloads::update_status(
|
||||
conn,
|
||||
item.id,
|
||||
DownloadStatus::Failed,
|
||||
Some("dry run"),
|
||||
)
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Mark as downloading
|
||||
queries::downloads::update_status(conn, item.id, DownloadStatus::Downloading, None)
|
||||
.await?;
|
||||
|
||||
// Determine download target
|
||||
let target = if let Some(ref url) = item.source_url {
|
||||
DownloadTarget::Url(url.clone())
|
||||
} else {
|
||||
DownloadTarget::Query(item.query.clone())
|
||||
};
|
||||
|
||||
// Attempt download
|
||||
match backend.download(&target, config).await {
|
||||
Ok(result) => {
|
||||
tracing::info!(
|
||||
id = item.id,
|
||||
path = %result.file_path.display(),
|
||||
title = %result.title,
|
||||
"download completed"
|
||||
);
|
||||
|
||||
queries::downloads::update_status(
|
||||
conn,
|
||||
item.id,
|
||||
DownloadStatus::Completed,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Update wanted item status if linked
|
||||
if let Some(wanted_id) = item.wanted_item_id {
|
||||
if let Err(e) = queries::wanted::update_status(
|
||||
conn,
|
||||
wanted_id,
|
||||
WantedStatus::Downloaded,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
wanted_id = wanted_id,
|
||||
error = %e,
|
||||
"failed to update wanted item status"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
stats.downloads_completed += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
let error_msg = e.to_string();
|
||||
tracing::error!(
|
||||
id = item.id,
|
||||
query = %item.query,
|
||||
error = %error_msg,
|
||||
"download failed"
|
||||
);
|
||||
|
||||
queries::downloads::update_status(
|
||||
conn,
|
||||
item.id,
|
||||
DownloadStatus::Failed,
|
||||
Some(&error_msg),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Auto-retry transient errors
|
||||
if e.is_transient() && item.retry_count < MAX_RETRIES {
|
||||
let delay_idx = item.retry_count.min(RETRY_DELAYS.len() as i32 - 1) as usize;
|
||||
let delay = RETRY_DELAYS[delay_idx];
|
||||
tracing::info!(
|
||||
id = item.id,
|
||||
retry = item.retry_count + 1,
|
||||
delay_secs = delay.as_secs(),
|
||||
"scheduling retry"
|
||||
);
|
||||
queries::downloads::retry_failed(conn, item.id).await?;
|
||||
|
||||
// If rate limited, pause before continuing
|
||||
if matches!(e, DlError::RateLimited(_)) {
|
||||
tracing::warn!("rate limited — pausing queue processing");
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
|
||||
stats.downloads_failed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(%stats, "queue processing complete");
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Download a single item directly (not from queue).
|
||||
pub async fn download_single(
|
||||
backend: &impl DownloadBackend,
|
||||
target: DownloadTarget,
|
||||
config: &BackendConfig,
|
||||
dry_run: bool,
|
||||
) -> DlResult<()> {
|
||||
if dry_run {
|
||||
match &target {
|
||||
DownloadTarget::Url(url) => {
|
||||
tracing::info!(url = %url, "DRY RUN: would download URL");
|
||||
}
|
||||
DownloadTarget::Query(q) => {
|
||||
tracing::info!(query = %q, "DRY RUN: would search and download");
|
||||
|
||||
// Still run the search to show what would be downloaded
|
||||
let results = backend.search(q).await?;
|
||||
if let Some(best) = results.first() {
|
||||
tracing::info!(
|
||||
title = %best.title,
|
||||
artist = ?best.artist,
|
||||
url = %best.url,
|
||||
"would download"
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(query = %q, "no results found");
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let result = backend.download(&target, config).await?;
|
||||
println!(
|
||||
"Downloaded: {} → {}",
|
||||
result.title,
|
||||
result.file_path.display()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user