Compare commits

...

1 Commits

Author SHA1 Message Date
Connor Johnstone d2b21789d3 redux of the worker queue 2026-03-23 18:37:45 -04:00
8 changed files with 547 additions and 0 deletions
+4
View File
@@ -3,17 +3,21 @@ pub mod artist;
pub mod download_queue;
pub mod playlist;
pub mod playlist_track;
pub mod scheduler_state;
pub mod search_cache;
pub mod track;
pub mod user;
pub mod wanted_item;
pub mod work_queue;
pub use album::Entity as Albums;
pub use artist::Entity as Artists;
pub use download_queue::Entity as DownloadQueue;
pub use playlist::Entity as Playlists;
pub use playlist_track::Entity as PlaylistTracks;
pub use scheduler_state::Entity as SchedulerStates;
pub use search_cache::Entity as SearchCache;
pub use track::Entity as Tracks;
pub use user::Entity as Users;
pub use wanted_item::Entity as WantedItems;
pub use work_queue::Entity as WorkQueues;
+23
View File
@@ -0,0 +1,23 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "scheduler_state")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(unique)]
pub job_name: String,
#[sea_orm(nullable)]
pub last_run_at: Option<chrono::NaiveDateTime>,
#[sea_orm(nullable)]
pub last_result: Option<String>,
#[sea_orm(nullable)]
pub next_run_at: Option<chrono::NaiveDateTime>,
pub enabled: bool,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
+66
View File
@@ -0,0 +1,66 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumIter, DeriveActiveEnum,
)]
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum WorkTaskType {
#[sea_orm(string_value = "download")]
Download,
#[sea_orm(string_value = "index")]
Index,
#[sea_orm(string_value = "tag")]
Tag,
#[sea_orm(string_value = "organize")]
Organize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum WorkQueueStatus {
#[sea_orm(string_value = "pending")]
Pending,
#[sea_orm(string_value = "running")]
Running,
#[sea_orm(string_value = "completed")]
Completed,
#[sea_orm(string_value = "failed")]
Failed,
}
impl std::fmt::Display for WorkTaskType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Download => write!(f, "download"),
Self::Index => write!(f, "index"),
Self::Tag => write!(f, "tag"),
Self::Organize => write!(f, "organize"),
}
}
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "work_queue")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub task_type: WorkTaskType,
pub status: WorkQueueStatus,
pub payload_json: String,
#[sea_orm(nullable)]
pub pipeline_id: Option<String>,
pub created_at: chrono::NaiveDateTime,
#[sea_orm(nullable)]
pub started_at: Option<chrono::NaiveDateTime>,
#[sea_orm(nullable)]
pub completed_at: Option<chrono::NaiveDateTime>,
#[sea_orm(nullable)]
pub error: Option<String>,
pub retry_count: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
@@ -0,0 +1,141 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Work queue table
manager
.create_table(
Table::create()
.table(WorkQueue::Table)
.if_not_exists()
.col(
ColumnDef::new(WorkQueue::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(WorkQueue::TaskType).text().not_null())
.col(
ColumnDef::new(WorkQueue::Status)
.text()
.not_null()
.default("pending"),
)
.col(ColumnDef::new(WorkQueue::PayloadJson).text().not_null())
.col(ColumnDef::new(WorkQueue::PipelineId).text())
.col(
ColumnDef::new(WorkQueue::CreatedAt)
.date_time()
.not_null()
.default(Expr::current_timestamp()),
)
.col(ColumnDef::new(WorkQueue::StartedAt).date_time())
.col(ColumnDef::new(WorkQueue::CompletedAt).date_time())
.col(ColumnDef::new(WorkQueue::Error).text())
.col(
ColumnDef::new(WorkQueue::RetryCount)
.integer()
.not_null()
.default(0),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_work_queue_status_type")
.table(WorkQueue::Table)
.col(WorkQueue::Status)
.col(WorkQueue::TaskType)
.col(WorkQueue::CreatedAt)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_work_queue_pipeline_id")
.table(WorkQueue::Table)
.col(WorkQueue::PipelineId)
.to_owned(),
)
.await?;
// Scheduler state table
manager
.create_table(
Table::create()
.table(SchedulerState::Table)
.if_not_exists()
.col(
ColumnDef::new(SchedulerState::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(SchedulerState::JobName)
.text()
.not_null()
.unique_key(),
)
.col(ColumnDef::new(SchedulerState::LastRunAt).date_time())
.col(ColumnDef::new(SchedulerState::LastResult).text())
.col(ColumnDef::new(SchedulerState::NextRunAt).date_time())
.col(
ColumnDef::new(SchedulerState::Enabled)
.boolean()
.not_null()
.default(true),
)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(SchedulerState::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(WorkQueue::Table).to_owned())
.await
}
}
#[derive(DeriveIden)]
enum WorkQueue {
Table,
Id,
TaskType,
Status,
PayloadJson,
PipelineId,
CreatedAt,
StartedAt,
CompletedAt,
Error,
RetryCount,
}
#[derive(DeriveIden)]
enum SchedulerState {
Table,
Id,
JobName,
LastRunAt,
LastResult,
NextRunAt,
Enabled,
}
+2
View File
@@ -15,6 +15,7 @@ mod m20260320_000013_add_artist_monitoring;
mod m20260320_000014_create_playlists;
mod m20260320_000015_add_subsonic_password;
mod m20260323_000016_remove_orphaned_artists;
mod m20260323_000017_create_work_queue_and_scheduler;
pub struct Migrator;
@@ -37,6 +38,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260320_000014_create_playlists::Migration),
Box::new(m20260320_000015_add_subsonic_password::Migration),
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration),
]
}
}
+2
View File
@@ -3,6 +3,8 @@ pub mod artists;
pub mod cache;
pub mod downloads;
pub mod playlists;
pub mod scheduler_state;
pub mod tracks;
pub mod users;
pub mod wanted;
pub mod work_queue;
+67
View File
@@ -0,0 +1,67 @@
use chrono::Utc;
use sea_orm::*;
use crate::entities::scheduler_state::{self, ActiveModel, Entity as SchedulerState, Model as Job};
use crate::error::DbResult;
/// Get or create a scheduler job by name. If it doesn't exist, creates it with defaults.
pub async fn get_or_create(db: &DatabaseConnection, job_name: &str) -> DbResult<Job> {
if let Some(existing) = SchedulerState::find()
.filter(scheduler_state::Column::JobName.eq(job_name))
.one(db)
.await?
{
return Ok(existing);
}
let active = ActiveModel {
job_name: Set(job_name.to_string()),
last_run_at: Set(None),
last_result: Set(None),
next_run_at: Set(None),
enabled: Set(true),
..Default::default()
};
Ok(active.insert(db).await?)
}
/// Update the last run time and result for a job.
pub async fn update_last_run(
db: &DatabaseConnection,
job_name: &str,
result: &str,
) -> DbResult<Job> {
let job = get_or_create(db, job_name).await?;
let mut active: ActiveModel = job.into();
active.last_run_at = Set(Some(Utc::now().naive_utc()));
active.last_result = Set(Some(result.to_string()));
Ok(active.update(db).await?)
}
/// Update the next scheduled run time for a job.
pub async fn update_next_run(
db: &DatabaseConnection,
job_name: &str,
next_at: Option<chrono::NaiveDateTime>,
) -> DbResult<Job> {
let job = get_or_create(db, job_name).await?;
let mut active: ActiveModel = job.into();
active.next_run_at = Set(next_at);
Ok(active.update(db).await?)
}
/// Set the enabled state for a job.
pub async fn set_enabled(db: &DatabaseConnection, job_name: &str, enabled: bool) -> DbResult<Job> {
let job = get_or_create(db, job_name).await?;
let mut active: ActiveModel = job.into();
active.enabled = Set(enabled);
Ok(active.update(db).await?)
}
/// List all scheduler jobs.
pub async fn list_all(db: &DatabaseConnection) -> DbResult<Vec<Job>> {
Ok(SchedulerState::find()
.order_by_asc(scheduler_state::Column::JobName)
.all(db)
.await?)
}
+242
View File
@@ -0,0 +1,242 @@
use chrono::Utc;
use sea_orm::*;
use sea_orm::sea_query::Expr;
use serde::Serialize;
use crate::entities::work_queue::{
self, ActiveModel, Entity as WorkQueue, Model as WorkItem, WorkQueueStatus, WorkTaskType,
};
use crate::error::{DbError, DbResult};
pub async fn enqueue(
db: &DatabaseConnection,
task_type: WorkTaskType,
payload_json: &str,
pipeline_id: Option<&str>,
) -> DbResult<WorkItem> {
let now = Utc::now().naive_utc();
let active = ActiveModel {
task_type: Set(task_type),
status: Set(WorkQueueStatus::Pending),
payload_json: Set(payload_json.to_string()),
pipeline_id: Set(pipeline_id.map(String::from)),
created_at: Set(now),
started_at: Set(None),
completed_at: Set(None),
error: Set(None),
retry_count: Set(0),
..Default::default()
};
Ok(active.insert(db).await?)
}
pub async fn enqueue_batch(
db: &DatabaseConnection,
items: Vec<(WorkTaskType, String, Option<String>)>,
) -> DbResult<Vec<WorkItem>> {
let now = Utc::now().naive_utc();
let mut results = Vec::with_capacity(items.len());
for (task_type, payload_json, pipeline_id) in items {
let active = ActiveModel {
task_type: Set(task_type),
status: Set(WorkQueueStatus::Pending),
payload_json: Set(payload_json),
pipeline_id: Set(pipeline_id),
created_at: Set(now),
started_at: Set(None),
completed_at: Set(None),
error: Set(None),
retry_count: Set(0),
..Default::default()
};
results.push(active.insert(db).await?);
}
Ok(results)
}
/// Atomically claim up to `limit` pending items of the given type by setting
/// their status to Running. Returns the claimed items.
pub async fn claim_next(
db: &DatabaseConnection,
task_type: WorkTaskType,
limit: u64,
) -> DbResult<Vec<WorkItem>> {
let pending = WorkQueue::find()
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Pending))
.filter(work_queue::Column::TaskType.eq(task_type))
.order_by_asc(work_queue::Column::CreatedAt)
.limit(limit)
.all(db)
.await?;
let now = Utc::now().naive_utc();
let mut claimed = Vec::with_capacity(pending.len());
for item in pending {
let mut active: ActiveModel = item.into();
active.status = Set(WorkQueueStatus::Running);
active.started_at = Set(Some(now));
let updated = active.update(db).await?;
claimed.push(updated);
}
Ok(claimed)
}
pub async fn complete(db: &DatabaseConnection, id: i32) -> DbResult<()> {
let item = WorkQueue::find_by_id(id)
.one(db)
.await?
.ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?;
let mut active: ActiveModel = item.into();
active.status = Set(WorkQueueStatus::Completed);
active.completed_at = Set(Some(Utc::now().naive_utc()));
active.error = Set(None);
active.update(db).await?;
Ok(())
}
pub async fn fail(db: &DatabaseConnection, id: i32, error: &str) -> DbResult<()> {
let item = WorkQueue::find_by_id(id)
.one(db)
.await?
.ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?;
let retry_count = item.retry_count;
let mut active: ActiveModel = item.into();
if retry_count < 3 {
// Auto-retry: reset to pending with incremented count
active.status = Set(WorkQueueStatus::Pending);
active.started_at = Set(None);
active.retry_count = Set(retry_count + 1);
active.error = Set(Some(error.to_string()));
} else {
// Max retries exceeded: mark as permanently failed
active.status = Set(WorkQueueStatus::Failed);
active.completed_at = Set(Some(Utc::now().naive_utc()));
active.error = Set(Some(error.to_string()));
}
active.update(db).await?;
Ok(())
}
/// Reset any items stuck in Running state back to Pending (for startup recovery).
pub async fn reset_stale_running(db: &DatabaseConnection) -> DbResult<u64> {
let result = WorkQueue::update_many()
.col_expr(
work_queue::Column::Status,
Expr::value(WorkQueueStatus::Pending),
)
.col_expr(work_queue::Column::StartedAt, Expr::value(Option::<chrono::NaiveDateTime>::None))
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Running))
.exec(db)
.await?;
Ok(result.rows_affected)
}
#[derive(Debug, Clone, Serialize)]
pub struct TypeCounts {
pub pending: u64,
pub running: u64,
pub completed: u64,
pub failed: u64,
}
/// Get counts grouped by status for a specific task type.
pub async fn counts_for_type(
db: &DatabaseConnection,
task_type: WorkTaskType,
) -> DbResult<TypeCounts> {
let items = WorkQueue::find()
.filter(work_queue::Column::TaskType.eq(task_type))
.all(db)
.await?;
let mut counts = TypeCounts {
pending: 0,
running: 0,
completed: 0,
failed: 0,
};
for item in items {
match item.status {
WorkQueueStatus::Pending => counts.pending += 1,
WorkQueueStatus::Running => counts.running += 1,
WorkQueueStatus::Completed => counts.completed += 1,
WorkQueueStatus::Failed => counts.failed += 1,
}
}
Ok(counts)
}
/// Get counts for all task types at once.
pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
let items = WorkQueue::find().all(db).await?;
let mut result = AllCounts {
download: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
index: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
tag: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
organize: TypeCounts { pending: 0, running: 0, completed: 0, failed: 0 },
};
for item in items {
let counts = match item.task_type {
WorkTaskType::Download => &mut result.download,
WorkTaskType::Index => &mut result.index,
WorkTaskType::Tag => &mut result.tag,
WorkTaskType::Organize => &mut result.organize,
};
match item.status {
WorkQueueStatus::Pending => counts.pending += 1,
WorkQueueStatus::Running => counts.running += 1,
WorkQueueStatus::Completed => counts.completed += 1,
WorkQueueStatus::Failed => counts.failed += 1,
}
}
Ok(result)
}
#[derive(Debug, Clone, Serialize)]
pub struct AllCounts {
pub download: TypeCounts,
pub index: TypeCounts,
pub tag: TypeCounts,
pub organize: TypeCounts,
}
/// Check if all items for a pipeline are completed.
pub async fn pipeline_is_complete(
db: &DatabaseConnection,
pipeline_id: &str,
) -> DbResult<bool> {
let incomplete = WorkQueue::find()
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed))
.count(db)
.await?;
Ok(incomplete == 0)
}
/// Delete completed items older than the given number of days.
pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) -> DbResult<u64> {
let cutoff = (Utc::now() - chrono::Duration::days(older_than_days)).naive_utc();
let result = WorkQueue::delete_many()
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Completed))
.filter(work_queue::Column::CompletedAt.lt(cutoff))
.exec(db)
.await?;
Ok(result.rows_affected)
}
/// Check if there are any pending or running items.
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
let count = WorkQueue::find()
.filter(
work_queue::Column::Status
.eq(WorkQueueStatus::Pending)
.or(work_queue::Column::Status.eq(WorkQueueStatus::Running)),
)
.count(db)
.await?;
Ok(count > 0)
}