Initial commit
This commit is contained in:
83
src/queries/downloads.rs
Normal file
83
src/queries/downloads.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use chrono::Utc;
|
||||
use sea_orm::*;
|
||||
|
||||
use crate::entities::download_queue::{
|
||||
self, ActiveModel, DownloadStatus, Entity as DownloadQueue, Model as DownloadQueueItem,
|
||||
};
|
||||
use crate::error::{DbError, DbResult};
|
||||
|
||||
pub async fn enqueue(
|
||||
db: &DatabaseConnection,
|
||||
query: &str,
|
||||
wanted_item_id: Option<i32>,
|
||||
source_backend: &str,
|
||||
) -> DbResult<DownloadQueueItem> {
|
||||
let now = Utc::now().naive_utc();
|
||||
let active = ActiveModel {
|
||||
wanted_item_id: Set(wanted_item_id),
|
||||
query: Set(query.to_string()),
|
||||
source_url: Set(None),
|
||||
source_backend: Set(source_backend.to_string()),
|
||||
status: Set(DownloadStatus::Pending),
|
||||
error_message: Set(None),
|
||||
retry_count: Set(0),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
..Default::default()
|
||||
};
|
||||
Ok(active.insert(db).await?)
|
||||
}
|
||||
|
||||
pub async fn get_next_pending(db: &DatabaseConnection) -> DbResult<Option<DownloadQueueItem>> {
|
||||
Ok(DownloadQueue::find()
|
||||
.filter(download_queue::Column::Status.eq(DownloadStatus::Pending))
|
||||
.order_by_asc(download_queue::Column::CreatedAt)
|
||||
.one(db)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn update_status(
|
||||
db: &DatabaseConnection,
|
||||
id: i32,
|
||||
status: DownloadStatus,
|
||||
error: Option<&str>,
|
||||
) -> DbResult<()> {
|
||||
let item = DownloadQueue::find_by_id(id)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| DbError::NotFound(format!("download_queue id={id}")))?;
|
||||
let mut active: ActiveModel = item.into();
|
||||
active.status = Set(status);
|
||||
active.error_message = Set(error.map(String::from));
|
||||
active.updated_at = Set(Utc::now().naive_utc());
|
||||
active.update(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list(
|
||||
db: &DatabaseConnection,
|
||||
status_filter: Option<DownloadStatus>,
|
||||
) -> DbResult<Vec<DownloadQueueItem>> {
|
||||
let mut query = DownloadQueue::find();
|
||||
if let Some(status) = status_filter {
|
||||
query = query.filter(download_queue::Column::Status.eq(status));
|
||||
}
|
||||
Ok(query
|
||||
.order_by_desc(download_queue::Column::CreatedAt)
|
||||
.all(db)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn retry_failed(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
||||
let item = DownloadQueue::find_by_id(id)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| DbError::NotFound(format!("download_queue id={id}")))?;
|
||||
let mut active: ActiveModel = item.into();
|
||||
active.status = Set(DownloadStatus::Pending);
|
||||
active.error_message = Set(None);
|
||||
active.retry_count = Set(active.retry_count.unwrap() + 1);
|
||||
active.updated_at = Set(Utc::now().naive_utc());
|
||||
active.update(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user