From d2b21789d3de23935f37ef61bf93c8f3814658ef Mon Sep 17 00:00:00 2001 From: Connor Johnstone Date: Mon, 23 Mar 2026 18:37:45 -0400 Subject: [PATCH] redux of the worker queue --- src/entities/mod.rs | 4 + src/entities/scheduler_state.rs | 23 ++ src/entities/work_queue.rs | 66 +++++ ..._000017_create_work_queue_and_scheduler.rs | 141 ++++++++++ src/migration/mod.rs | 2 + src/queries/mod.rs | 2 + src/queries/scheduler_state.rs | 67 +++++ src/queries/work_queue.rs | 242 ++++++++++++++++++ 8 files changed, 547 insertions(+) create mode 100644 src/entities/scheduler_state.rs create mode 100644 src/entities/work_queue.rs create mode 100644 src/migration/m20260323_000017_create_work_queue_and_scheduler.rs create mode 100644 src/queries/scheduler_state.rs create mode 100644 src/queries/work_queue.rs diff --git a/src/entities/mod.rs b/src/entities/mod.rs index 1cf652e..e430b80 100644 --- a/src/entities/mod.rs +++ b/src/entities/mod.rs @@ -3,17 +3,21 @@ pub mod artist; pub mod download_queue; pub mod playlist; pub mod playlist_track; +pub mod scheduler_state; pub mod search_cache; pub mod track; pub mod user; pub mod wanted_item; +pub mod work_queue; pub use album::Entity as Albums; pub use artist::Entity as Artists; pub use download_queue::Entity as DownloadQueue; pub use playlist::Entity as Playlists; pub use playlist_track::Entity as PlaylistTracks; +pub use scheduler_state::Entity as SchedulerStates; pub use search_cache::Entity as SearchCache; pub use track::Entity as Tracks; pub use user::Entity as Users; pub use wanted_item::Entity as WantedItems; +pub use work_queue::Entity as WorkQueues; diff --git a/src/entities/scheduler_state.rs b/src/entities/scheduler_state.rs new file mode 100644 index 0000000..5d4f333 --- /dev/null +++ b/src/entities/scheduler_state.rs @@ -0,0 +1,23 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "scheduler_state")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + #[sea_orm(unique)] + pub job_name: String, + #[sea_orm(nullable)] + pub last_run_at: Option, + #[sea_orm(nullable)] + pub last_result: Option, + #[sea_orm(nullable)] + pub next_run_at: Option, + pub enabled: bool, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/entities/work_queue.rs b/src/entities/work_queue.rs new file mode 100644 index 0000000..34c9a84 --- /dev/null +++ b/src/entities/work_queue.rs @@ -0,0 +1,66 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumIter, DeriveActiveEnum, +)] +#[sea_orm(rs_type = "String", db_type = "Text")] +pub enum WorkTaskType { + #[sea_orm(string_value = "download")] + Download, + #[sea_orm(string_value = "index")] + Index, + #[sea_orm(string_value = "tag")] + Tag, + #[sea_orm(string_value = "organize")] + Organize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "Text")] +pub enum WorkQueueStatus { + #[sea_orm(string_value = "pending")] + Pending, + #[sea_orm(string_value = "running")] + Running, + #[sea_orm(string_value = "completed")] + Completed, + #[sea_orm(string_value = "failed")] + Failed, +} + +impl std::fmt::Display for WorkTaskType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Download => write!(f, "download"), + Self::Index => write!(f, "index"), + Self::Tag => write!(f, "tag"), + Self::Organize => write!(f, "organize"), + } + } +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "work_queue")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub task_type: WorkTaskType, + pub status: WorkQueueStatus, + pub payload_json: String, + #[sea_orm(nullable)] + pub pipeline_id: Option, + pub created_at: chrono::NaiveDateTime, + #[sea_orm(nullable)] + pub started_at: Option, + #[sea_orm(nullable)] + pub completed_at: Option, + #[sea_orm(nullable)] + pub error: Option, + pub retry_count: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/migration/m20260323_000017_create_work_queue_and_scheduler.rs b/src/migration/m20260323_000017_create_work_queue_and_scheduler.rs new file mode 100644 index 0000000..e8b89e0 --- /dev/null +++ b/src/migration/m20260323_000017_create_work_queue_and_scheduler.rs @@ -0,0 +1,141 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Work queue table + manager + .create_table( + Table::create() + .table(WorkQueue::Table) + .if_not_exists() + .col( + ColumnDef::new(WorkQueue::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(WorkQueue::TaskType).text().not_null()) + .col( + ColumnDef::new(WorkQueue::Status) + .text() + .not_null() + .default("pending"), + ) + .col(ColumnDef::new(WorkQueue::PayloadJson).text().not_null()) + .col(ColumnDef::new(WorkQueue::PipelineId).text()) + .col( + ColumnDef::new(WorkQueue::CreatedAt) + .date_time() + .not_null() + .default(Expr::current_timestamp()), + ) + .col(ColumnDef::new(WorkQueue::StartedAt).date_time()) + .col(ColumnDef::new(WorkQueue::CompletedAt).date_time()) + .col(ColumnDef::new(WorkQueue::Error).text()) + .col( + ColumnDef::new(WorkQueue::RetryCount) + .integer() + .not_null() + .default(0), + ) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_work_queue_status_type") + .table(WorkQueue::Table) + .col(WorkQueue::Status) + .col(WorkQueue::TaskType) + .col(WorkQueue::CreatedAt) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_work_queue_pipeline_id") + .table(WorkQueue::Table) + .col(WorkQueue::PipelineId) + .to_owned(), + ) + .await?; + + // Scheduler state table + manager + .create_table( + Table::create() + .table(SchedulerState::Table) + .if_not_exists() + .col( + ColumnDef::new(SchedulerState::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(SchedulerState::JobName) + .text() + .not_null() + .unique_key(), + ) + .col(ColumnDef::new(SchedulerState::LastRunAt).date_time()) + .col(ColumnDef::new(SchedulerState::LastResult).text()) + .col(ColumnDef::new(SchedulerState::NextRunAt).date_time()) + .col( + ColumnDef::new(SchedulerState::Enabled) + .boolean() + .not_null() + .default(true), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(SchedulerState::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(WorkQueue::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum WorkQueue { + Table, + Id, + TaskType, + Status, + PayloadJson, + PipelineId, + CreatedAt, + StartedAt, + CompletedAt, + Error, + RetryCount, +} + +#[derive(DeriveIden)] +enum SchedulerState { + Table, + Id, + JobName, + LastRunAt, + LastResult, + NextRunAt, + Enabled, +} diff --git a/src/migration/mod.rs b/src/migration/mod.rs index dcca8e5..1173423 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -15,6 +15,7 @@ mod m20260320_000013_add_artist_monitoring; mod m20260320_000014_create_playlists; mod m20260320_000015_add_subsonic_password; mod m20260323_000016_remove_orphaned_artists; +mod m20260323_000017_create_work_queue_and_scheduler; pub struct Migrator; @@ -37,6 +38,7 @@ impl MigratorTrait for Migrator { Box::new(m20260320_000014_create_playlists::Migration), Box::new(m20260320_000015_add_subsonic_password::Migration), Box::new(m20260323_000016_remove_orphaned_artists::Migration), + Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration), ] } } diff --git a/src/queries/mod.rs b/src/queries/mod.rs index 6a806cd..c1a41be 100644 --- a/src/queries/mod.rs +++ b/src/queries/mod.rs @@ -3,6 +3,8 @@ pub mod artists; pub mod cache; pub mod downloads; pub mod playlists; +pub mod scheduler_state; pub mod tracks; pub mod users; pub mod wanted; +pub mod work_queue; diff --git a/src/queries/scheduler_state.rs b/src/queries/scheduler_state.rs new file mode 100644 index 0000000..e63b6fa --- /dev/null +++ b/src/queries/scheduler_state.rs @@ -0,0 +1,67 @@ +use chrono::Utc; +use sea_orm::*; + +use crate::entities::scheduler_state::{self, ActiveModel, Entity as SchedulerState, Model as Job}; +use crate::error::DbResult; + +/// Get or create a scheduler job by name. If it doesn't exist, creates it with defaults. +pub async fn get_or_create(db: &DatabaseConnection, job_name: &str) -> DbResult { + if let Some(existing) = SchedulerState::find() + .filter(scheduler_state::Column::JobName.eq(job_name)) + .one(db) + .await? + { + return Ok(existing); + } + + let active = ActiveModel { + job_name: Set(job_name.to_string()), + last_run_at: Set(None), + last_result: Set(None), + next_run_at: Set(None), + enabled: Set(true), + ..Default::default() + }; + Ok(active.insert(db).await?) +} + +/// Update the last run time and result for a job. +pub async fn update_last_run( + db: &DatabaseConnection, + job_name: &str, + result: &str, +) -> DbResult { + let job = get_or_create(db, job_name).await?; + let mut active: ActiveModel = job.into(); + active.last_run_at = Set(Some(Utc::now().naive_utc())); + active.last_result = Set(Some(result.to_string())); + Ok(active.update(db).await?) +} + +/// Update the next scheduled run time for a job. +pub async fn update_next_run( + db: &DatabaseConnection, + job_name: &str, + next_at: Option, +) -> DbResult { + let job = get_or_create(db, job_name).await?; + let mut active: ActiveModel = job.into(); + active.next_run_at = Set(next_at); + Ok(active.update(db).await?) +} + +/// Set the enabled state for a job. +pub async fn set_enabled(db: &DatabaseConnection, job_name: &str, enabled: bool) -> DbResult { + let job = get_or_create(db, job_name).await?; + let mut active: ActiveModel = job.into(); + active.enabled = Set(enabled); + Ok(active.update(db).await?) +} + +/// List all scheduler jobs. +pub async fn list_all(db: &DatabaseConnection) -> DbResult> { + Ok(SchedulerState::find() + .order_by_asc(scheduler_state::Column::JobName) + .all(db) + .await?) +} diff --git a/src/queries/work_queue.rs b/src/queries/work_queue.rs new file mode 100644 index 0000000..21d5bb9 --- /dev/null +++ b/src/queries/work_queue.rs @@ -0,0 +1,242 @@ +use chrono::Utc; +use sea_orm::*; +use sea_orm::sea_query::Expr; +use serde::Serialize; + +use crate::entities::work_queue::{ + self, ActiveModel, Entity as WorkQueue, Model as WorkItem, WorkQueueStatus, WorkTaskType, +}; +use crate::error::{DbError, DbResult}; + +pub async fn enqueue( + db: &DatabaseConnection, + task_type: WorkTaskType, + payload_json: &str, + pipeline_id: Option<&str>, +) -> DbResult { + let now = Utc::now().naive_utc(); + let active = ActiveModel { + task_type: Set(task_type), + status: Set(WorkQueueStatus::Pending), + payload_json: Set(payload_json.to_string()), + pipeline_id: Set(pipeline_id.map(String::from)), + created_at: Set(now), + started_at: Set(None), + completed_at: Set(None), + error: Set(None), + retry_count: Set(0), + ..Default::default() + }; + Ok(active.insert(db).await?) +} + +pub async fn enqueue_batch( + db: &DatabaseConnection, + items: Vec<(WorkTaskType, String, Option)>, +) -> DbResult> { + let now = Utc::now().naive_utc(); + let mut results = Vec::with_capacity(items.len()); + for (task_type, payload_json, pipeline_id) in items { + let active = ActiveModel { + task_type: Set(task_type), + status: Set(WorkQueueStatus::Pending), + payload_json: Set(payload_json), + pipeline_id: Set(pipeline_id), + created_at: Set(now), + started_at: Set(None), + completed_at: Set(None), + error: Set(None), + retry_count: Set(0), + ..Default::default() + }; + results.push(active.insert(db).await?); + } + Ok(results) +} + +/// Atomically claim up to `limit` pending items of the given type by setting +/// their status to Running. Returns the claimed items. +pub async fn claim_next( + db: &DatabaseConnection, + task_type: WorkTaskType, + limit: u64, +) -> DbResult> { + let pending = WorkQueue::find() + .filter(work_queue::Column::Status.eq(WorkQueueStatus::Pending)) + .filter(work_queue::Column::TaskType.eq(task_type)) + .order_by_asc(work_queue::Column::CreatedAt) + .limit(limit) + .all(db) + .await?; + + let now = Utc::now().naive_utc(); + let mut claimed = Vec::with_capacity(pending.len()); + for item in pending { + let mut active: ActiveModel = item.into(); + active.status = Set(WorkQueueStatus::Running); + active.started_at = Set(Some(now)); + let updated = active.update(db).await?; + claimed.push(updated); + } + Ok(claimed) +} + +pub async fn complete(db: &DatabaseConnection, id: i32) -> DbResult<()> { + let item = WorkQueue::find_by_id(id) + .one(db) + .await? + .ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?; + let mut active: ActiveModel = item.into(); + active.status = Set(WorkQueueStatus::Completed); + active.completed_at = Set(Some(Utc::now().naive_utc())); + active.error = Set(None); + active.update(db).await?; + Ok(()) +} + +pub async fn fail(db: &DatabaseConnection, id: i32, error: &str) -> DbResult<()> { + let item = WorkQueue::find_by_id(id) + .one(db) + .await? + .ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?; + let retry_count = item.retry_count; + let mut active: ActiveModel = item.into(); + + if retry_count < 3 { + // Auto-retry: reset to pending with incremented count + active.status = Set(WorkQueueStatus::Pending); + active.started_at = Set(None); + active.retry_count = Set(retry_count + 1); + active.error = Set(Some(error.to_string())); + } else { + // Max retries exceeded: mark as permanently failed + active.status = Set(WorkQueueStatus::Failed); + active.completed_at = Set(Some(Utc::now().naive_utc())); + active.error = Set(Some(error.to_string())); + } + active.update(db).await?; + Ok(()) +} + +/// Reset any items stuck in Running state back to Pending (for startup recovery). +pub async fn reset_stale_running(db: &DatabaseConnection) -> DbResult { + let result = WorkQueue::update_many() + .col_expr( + work_queue::Column::Status, + Expr::value(WorkQueueStatus::Pending), + ) + .col_expr(work_queue::Column::StartedAt, Expr::value(Option::::None)) + .filter(work_queue::Column::Status.eq(WorkQueueStatus::Running)) + .exec(db) + .await?; + Ok(result.rows_affected) +} + +#[derive(Debug, Clone, Serialize)] +pub struct TypeCounts { + pub pending: u64, + pub running: u64, + pub completed: u64, + pub failed: u64, +} + +/// Get counts grouped by status for a specific task type. +pub async fn counts_for_type( + db: &DatabaseConnection, + task_type: WorkTaskType, +) -> DbResult { + let items = WorkQueue::find() + .filter(work_queue::Column::TaskType.eq(task_type)) + .all(db) + .await?; + + let mut counts = TypeCounts { + pending: 0, + running: 0, + completed: 0, + failed: 0, + }; + for item in items { + match item.status { + WorkQueueStatus::Pending => counts.pending += 1, + WorkQueueStatus::Running => counts.running += 1, + WorkQueueStatus::Completed => counts.completed += 1, + WorkQueueStatus::Failed => counts.failed += 1, + } + } + Ok(counts) +} + +/// Get counts for all task types at once. +pub async fn counts_all(db: &DatabaseConnection) -> DbResult { + let items = WorkQueue::find().all(db).await?; + + let mut result = AllCounts { + download: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 }, + index: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 }, + tag: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 }, + organize: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 }, + }; + + for item in items { + let counts = match item.task_type { + WorkTaskType::Download => &mut result.download, + WorkTaskType::Index => &mut result.index, + WorkTaskType::Tag => &mut result.tag, + WorkTaskType::Organize => &mut result.organize, + }; + match item.status { + WorkQueueStatus::Pending => counts.pending += 1, + WorkQueueStatus::Running => counts.running += 1, + WorkQueueStatus::Completed => counts.completed += 1, + WorkQueueStatus::Failed => counts.failed += 1, + } + } + Ok(result) +} + +#[derive(Debug, Clone, Serialize)] +pub struct AllCounts { + pub download: TypeCounts, + pub index: TypeCounts, + pub tag: TypeCounts, + pub organize: TypeCounts, +} + +/// Check if all items for a pipeline are completed. +pub async fn pipeline_is_complete( + db: &DatabaseConnection, + pipeline_id: &str, +) -> DbResult { + let incomplete = WorkQueue::find() + .filter(work_queue::Column::PipelineId.eq(pipeline_id)) + .filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed)) + .filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed)) + .count(db) + .await?; + Ok(incomplete == 0) +} + +/// Delete completed items older than the given number of days. +pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) -> DbResult { + let cutoff = (Utc::now() - chrono::Duration::days(older_than_days)).naive_utc(); + let result = WorkQueue::delete_many() + .filter(work_queue::Column::Status.eq(WorkQueueStatus::Completed)) + .filter(work_queue::Column::CompletedAt.lt(cutoff)) + .exec(db) + .await?; + Ok(result.rows_affected) +} + +/// Check if there are any pending or running items. +pub async fn has_active_work(db: &DatabaseConnection) -> DbResult { + let count = WorkQueue::find() + .filter( + work_queue::Column::Status + .eq(WorkQueueStatus::Pending) + .or(work_queue::Column::Status.eq(WorkQueueStatus::Running)), + ) + .count(db) + .await?; + Ok(count > 0) +}