redux of the worker queue
This commit is contained in:
@@ -0,0 +1,242 @@
|
||||
use chrono::Utc;
|
||||
use sea_orm::*;
|
||||
use sea_orm::sea_query::Expr;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::entities::work_queue::{
|
||||
self, ActiveModel, Entity as WorkQueue, Model as WorkItem, WorkQueueStatus, WorkTaskType,
|
||||
};
|
||||
use crate::error::{DbError, DbResult};
|
||||
|
||||
pub async fn enqueue(
|
||||
db: &DatabaseConnection,
|
||||
task_type: WorkTaskType,
|
||||
payload_json: &str,
|
||||
pipeline_id: Option<&str>,
|
||||
) -> DbResult<WorkItem> {
|
||||
let now = Utc::now().naive_utc();
|
||||
let active = ActiveModel {
|
||||
task_type: Set(task_type),
|
||||
status: Set(WorkQueueStatus::Pending),
|
||||
payload_json: Set(payload_json.to_string()),
|
||||
pipeline_id: Set(pipeline_id.map(String::from)),
|
||||
created_at: Set(now),
|
||||
started_at: Set(None),
|
||||
completed_at: Set(None),
|
||||
error: Set(None),
|
||||
retry_count: Set(0),
|
||||
..Default::default()
|
||||
};
|
||||
Ok(active.insert(db).await?)
|
||||
}
|
||||
|
||||
pub async fn enqueue_batch(
|
||||
db: &DatabaseConnection,
|
||||
items: Vec<(WorkTaskType, String, Option<String>)>,
|
||||
) -> DbResult<Vec<WorkItem>> {
|
||||
let now = Utc::now().naive_utc();
|
||||
let mut results = Vec::with_capacity(items.len());
|
||||
for (task_type, payload_json, pipeline_id) in items {
|
||||
let active = ActiveModel {
|
||||
task_type: Set(task_type),
|
||||
status: Set(WorkQueueStatus::Pending),
|
||||
payload_json: Set(payload_json),
|
||||
pipeline_id: Set(pipeline_id),
|
||||
created_at: Set(now),
|
||||
started_at: Set(None),
|
||||
completed_at: Set(None),
|
||||
error: Set(None),
|
||||
retry_count: Set(0),
|
||||
..Default::default()
|
||||
};
|
||||
results.push(active.insert(db).await?);
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Atomically claim up to `limit` pending items of the given type by setting
|
||||
/// their status to Running. Returns the claimed items.
|
||||
pub async fn claim_next(
|
||||
db: &DatabaseConnection,
|
||||
task_type: WorkTaskType,
|
||||
limit: u64,
|
||||
) -> DbResult<Vec<WorkItem>> {
|
||||
let pending = WorkQueue::find()
|
||||
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Pending))
|
||||
.filter(work_queue::Column::TaskType.eq(task_type))
|
||||
.order_by_asc(work_queue::Column::CreatedAt)
|
||||
.limit(limit)
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
let mut claimed = Vec::with_capacity(pending.len());
|
||||
for item in pending {
|
||||
let mut active: ActiveModel = item.into();
|
||||
active.status = Set(WorkQueueStatus::Running);
|
||||
active.started_at = Set(Some(now));
|
||||
let updated = active.update(db).await?;
|
||||
claimed.push(updated);
|
||||
}
|
||||
Ok(claimed)
|
||||
}
|
||||
|
||||
pub async fn complete(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
||||
let item = WorkQueue::find_by_id(id)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?;
|
||||
let mut active: ActiveModel = item.into();
|
||||
active.status = Set(WorkQueueStatus::Completed);
|
||||
active.completed_at = Set(Some(Utc::now().naive_utc()));
|
||||
active.error = Set(None);
|
||||
active.update(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fail(db: &DatabaseConnection, id: i32, error: &str) -> DbResult<()> {
|
||||
let item = WorkQueue::find_by_id(id)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?;
|
||||
let retry_count = item.retry_count;
|
||||
let mut active: ActiveModel = item.into();
|
||||
|
||||
if retry_count < 3 {
|
||||
// Auto-retry: reset to pending with incremented count
|
||||
active.status = Set(WorkQueueStatus::Pending);
|
||||
active.started_at = Set(None);
|
||||
active.retry_count = Set(retry_count + 1);
|
||||
active.error = Set(Some(error.to_string()));
|
||||
} else {
|
||||
// Max retries exceeded: mark as permanently failed
|
||||
active.status = Set(WorkQueueStatus::Failed);
|
||||
active.completed_at = Set(Some(Utc::now().naive_utc()));
|
||||
active.error = Set(Some(error.to_string()));
|
||||
}
|
||||
active.update(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reset any items stuck in Running state back to Pending (for startup recovery).
|
||||
pub async fn reset_stale_running(db: &DatabaseConnection) -> DbResult<u64> {
|
||||
let result = WorkQueue::update_many()
|
||||
.col_expr(
|
||||
work_queue::Column::Status,
|
||||
Expr::value(WorkQueueStatus::Pending),
|
||||
)
|
||||
.col_expr(work_queue::Column::StartedAt, Expr::value(Option::<chrono::NaiveDateTime>::None))
|
||||
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Running))
|
||||
.exec(db)
|
||||
.await?;
|
||||
Ok(result.rows_affected)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct TypeCounts {
|
||||
pub pending: u64,
|
||||
pub running: u64,
|
||||
pub completed: u64,
|
||||
pub failed: u64,
|
||||
}
|
||||
|
||||
/// Get counts grouped by status for a specific task type.
|
||||
pub async fn counts_for_type(
|
||||
db: &DatabaseConnection,
|
||||
task_type: WorkTaskType,
|
||||
) -> DbResult<TypeCounts> {
|
||||
let items = WorkQueue::find()
|
||||
.filter(work_queue::Column::TaskType.eq(task_type))
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
let mut counts = TypeCounts {
|
||||
pending: 0,
|
||||
running: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
};
|
||||
for item in items {
|
||||
match item.status {
|
||||
WorkQueueStatus::Pending => counts.pending += 1,
|
||||
WorkQueueStatus::Running => counts.running += 1,
|
||||
WorkQueueStatus::Completed => counts.completed += 1,
|
||||
WorkQueueStatus::Failed => counts.failed += 1,
|
||||
}
|
||||
}
|
||||
Ok(counts)
|
||||
}
|
||||
|
||||
/// Get counts for all task types at once.
|
||||
pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
|
||||
let items = WorkQueue::find().all(db).await?;
|
||||
|
||||
let mut result = AllCounts {
|
||||
download: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
|
||||
index: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
|
||||
tag: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
|
||||
organize: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
|
||||
};
|
||||
|
||||
for item in items {
|
||||
let counts = match item.task_type {
|
||||
WorkTaskType::Download => &mut result.download,
|
||||
WorkTaskType::Index => &mut result.index,
|
||||
WorkTaskType::Tag => &mut result.tag,
|
||||
WorkTaskType::Organize => &mut result.organize,
|
||||
};
|
||||
match item.status {
|
||||
WorkQueueStatus::Pending => counts.pending += 1,
|
||||
WorkQueueStatus::Running => counts.running += 1,
|
||||
WorkQueueStatus::Completed => counts.completed += 1,
|
||||
WorkQueueStatus::Failed => counts.failed += 1,
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct AllCounts {
|
||||
pub download: TypeCounts,
|
||||
pub index: TypeCounts,
|
||||
pub tag: TypeCounts,
|
||||
pub organize: TypeCounts,
|
||||
}
|
||||
|
||||
/// Check if all items for a pipeline are completed.
|
||||
pub async fn pipeline_is_complete(
|
||||
db: &DatabaseConnection,
|
||||
pipeline_id: &str,
|
||||
) -> DbResult<bool> {
|
||||
let incomplete = WorkQueue::find()
|
||||
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
|
||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed))
|
||||
.count(db)
|
||||
.await?;
|
||||
Ok(incomplete == 0)
|
||||
}
|
||||
|
||||
/// Delete completed items older than the given number of days.
|
||||
pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) -> DbResult<u64> {
|
||||
let cutoff = (Utc::now() - chrono::Duration::days(older_than_days)).naive_utc();
|
||||
let result = WorkQueue::delete_many()
|
||||
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Completed))
|
||||
.filter(work_queue::Column::CompletedAt.lt(cutoff))
|
||||
.exec(db)
|
||||
.await?;
|
||||
Ok(result.rows_affected)
|
||||
}
|
||||
|
||||
/// Check if there are any pending or running items.
|
||||
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
|
||||
let count = WorkQueue::find()
|
||||
.filter(
|
||||
work_queue::Column::Status
|
||||
.eq(WorkQueueStatus::Pending)
|
||||
.or(work_queue::Column::Status.eq(WorkQueueStatus::Running)),
|
||||
)
|
||||
.count(db)
|
||||
.await?;
|
||||
Ok(count > 0)
|
||||
}
|
||||
Reference in New Issue
Block a user