95 lines
2.9 KiB
Rust
95 lines
2.9 KiB
Rust
use chrono::Utc;
|
|
use sea_orm::*;
|
|
|
|
use crate::entities::download_queue::{
|
|
self, ActiveModel, DownloadStatus, Entity as DownloadQueue, Model as DownloadQueueItem,
|
|
};
|
|
use crate::error::{DbError, DbResult};
|
|
|
|
pub async fn enqueue(
|
|
db: &DatabaseConnection,
|
|
query: &str,
|
|
wanted_item_id: Option<i32>,
|
|
source_backend: &str,
|
|
) -> DbResult<DownloadQueueItem> {
|
|
let now = Utc::now().naive_utc();
|
|
let active = ActiveModel {
|
|
wanted_item_id: Set(wanted_item_id),
|
|
query: Set(query.to_string()),
|
|
source_url: Set(None),
|
|
source_backend: Set(source_backend.to_string()),
|
|
status: Set(DownloadStatus::Pending),
|
|
error_message: Set(None),
|
|
retry_count: Set(0),
|
|
created_at: Set(now),
|
|
updated_at: Set(now),
|
|
..Default::default()
|
|
};
|
|
Ok(active.insert(db).await?)
|
|
}
|
|
|
|
pub async fn get_next_pending(db: &DatabaseConnection) -> DbResult<Option<DownloadQueueItem>> {
|
|
Ok(DownloadQueue::find()
|
|
.filter(download_queue::Column::Status.eq(DownloadStatus::Pending))
|
|
.order_by_asc(download_queue::Column::CreatedAt)
|
|
.one(db)
|
|
.await?)
|
|
}
|
|
|
|
pub async fn update_status(
|
|
db: &DatabaseConnection,
|
|
id: i32,
|
|
status: DownloadStatus,
|
|
error: Option<&str>,
|
|
) -> DbResult<()> {
|
|
let item = DownloadQueue::find_by_id(id)
|
|
.one(db)
|
|
.await?
|
|
.ok_or_else(|| DbError::NotFound(format!("download_queue id={id}")))?;
|
|
let mut active: ActiveModel = item.into();
|
|
active.status = Set(status);
|
|
active.error_message = Set(error.map(String::from));
|
|
active.updated_at = Set(Utc::now().naive_utc());
|
|
active.update(db).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn list(
|
|
db: &DatabaseConnection,
|
|
status_filter: Option<DownloadStatus>,
|
|
) -> DbResult<Vec<DownloadQueueItem>> {
|
|
let mut query = DownloadQueue::find();
|
|
if let Some(status) = status_filter {
|
|
query = query.filter(download_queue::Column::Status.eq(status));
|
|
}
|
|
Ok(query
|
|
.order_by_desc(download_queue::Column::CreatedAt)
|
|
.all(db)
|
|
.await?)
|
|
}
|
|
|
|
pub async fn find_by_wanted_item_id(
|
|
db: &DatabaseConnection,
|
|
wanted_item_id: i32,
|
|
) -> DbResult<Option<DownloadQueueItem>> {
|
|
Ok(DownloadQueue::find()
|
|
.filter(download_queue::Column::WantedItemId.eq(wanted_item_id))
|
|
.one(db)
|
|
.await?)
|
|
}
|
|
|
|
pub async fn retry_failed(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
|
let item = DownloadQueue::find_by_id(id)
|
|
.one(db)
|
|
.await?
|
|
.ok_or_else(|| DbError::NotFound(format!("download_queue id={id}")))?;
|
|
let mut active: ActiveModel = item.into();
|
|
active.status = Set(DownloadStatus::Pending);
|
|
active.error_message = Set(None);
|
|
let current_retries = active.retry_count.take().unwrap_or(0);
|
|
active.retry_count = Set(current_retries + 1);
|
|
active.updated_at = Set(Utc::now().naive_utc());
|
|
active.update(db).await?;
|
|
Ok(())
|
|
}
|