diff --git a/src/entities/track.rs b/src/entities/track.rs index 2973b31..dc015c2 100644 --- a/src/entities/track.rs +++ b/src/entities/track.rs @@ -43,6 +43,7 @@ pub struct Model { pub file_mtime: Option, pub added_at: chrono::NaiveDateTime, pub updated_at: chrono::NaiveDateTime, + pub tagged: bool, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/entities/work_queue.rs b/src/entities/work_queue.rs index 34c9a84..0b949c0 100644 --- a/src/entities/work_queue.rs +++ b/src/entities/work_queue.rs @@ -14,6 +14,8 @@ pub enum WorkTaskType { Tag, #[sea_orm(string_value = "organize")] Organize, + #[sea_orm(string_value = "enrich")] + Enrich, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)] @@ -36,6 +38,7 @@ impl std::fmt::Display for WorkTaskType { Self::Index => write!(f, "index"), Self::Tag => write!(f, "tag"), Self::Organize => write!(f, "organize"), + Self::Enrich => write!(f, "enrich"), } } } diff --git a/src/migration/m20260324_000018_add_track_tagged.rs b/src/migration/m20260324_000018_add_track_tagged.rs new file mode 100644 index 0000000..b3dbcb5 --- /dev/null +++ b/src/migration/m20260324_000018_add_track_tagged.rs @@ -0,0 +1,40 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Tracks::Table) + .add_column( + ColumnDef::new(Tracks::Tagged) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Tracks::Table) + .drop_column(Tracks::Tagged) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Tracks { + Table, + Tagged, +} diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 1173423..9866cf0 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -16,6 +16,7 @@ mod m20260320_000014_create_playlists; mod m20260320_000015_add_subsonic_password; mod m20260323_000016_remove_orphaned_artists; mod m20260323_000017_create_work_queue_and_scheduler; +mod m20260324_000018_add_track_tagged; pub struct Migrator; @@ -39,6 +40,7 @@ impl MigratorTrait for Migrator { Box::new(m20260320_000015_add_subsonic_password::Migration), Box::new(m20260323_000016_remove_orphaned_artists::Migration), Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration), + Box::new(m20260324_000018_add_track_tagged::Migration), ] } } diff --git a/src/queries/tracks.rs b/src/queries/tracks.rs index ac14057..bb37cca 100644 --- a/src/queries/tracks.rs +++ b/src/queries/tracks.rs @@ -87,13 +87,6 @@ pub async fn get_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult< .await?) } -pub async fn get_untagged(db: &DatabaseConnection) -> DbResult> { - Ok(Tracks::find() - .filter(track::Column::MusicbrainzId.is_null()) - .all(db) - .await?) -} - /// Get tracks that need metadata enrichment — either no MBID at all, /// or have an MBID but are missing album info (e.g., freshly downloaded). pub async fn get_needing_metadata(db: &DatabaseConnection) -> DbResult> { @@ -175,6 +168,14 @@ pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbRes .await?) } +/// Get tracks that haven't been processed by the tagger yet. +pub async fn get_untagged(db: &DatabaseConnection) -> DbResult> { + Ok(Tracks::find() + .filter(track::Column::Tagged.eq(false)) + .all(db) + .await?) +} + pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult { let result = Tracks::delete_many() .filter(track::Column::ArtistId.eq(artist_id)) diff --git a/src/queries/work_queue.rs b/src/queries/work_queue.rs index 03a6e18..61a91bb 100644 --- a/src/queries/work_queue.rs +++ b/src/queries/work_queue.rs @@ -199,6 +199,12 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult { completed: 0, failed: 0, }, + enrich: TypeCounts { + pending: 0, + running: 0, + completed: 0, + failed: 0, + }, }; for item in items { @@ -207,6 +213,7 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult { WorkTaskType::Index => &mut result.index, WorkTaskType::Tag => &mut result.tag, WorkTaskType::Organize => &mut result.organize, + WorkTaskType::Enrich => &mut result.enrich, }; match item.status { WorkQueueStatus::Pending => counts.pending += 1, @@ -224,15 +231,24 @@ pub struct AllCounts { pub index: TypeCounts, pub tag: TypeCounts, pub organize: TypeCounts, + pub enrich: TypeCounts, } -/// Check if all items for a pipeline are completed. -pub async fn pipeline_is_complete(db: &DatabaseConnection, pipeline_id: &str) -> DbResult { - let incomplete = WorkQueue::find() +/// Check if all items for a pipeline are completed (excluding a specific item ID, +/// typically the currently-running item that hasn't been marked complete yet). +pub async fn pipeline_is_complete( + db: &DatabaseConnection, + pipeline_id: &str, + exclude_id: Option, +) -> DbResult { + let mut query = WorkQueue::find() .filter(work_queue::Column::PipelineId.eq(pipeline_id)) .filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed)) - .filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed)) - .count(db) + .filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed)); + if let Some(id) = exclude_id { + query = query.filter(work_queue::Column::Id.ne(id)); + } + let incomplete = query.count(db) .await?; Ok(incomplete == 0) } @@ -248,6 +264,35 @@ pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) -> Ok(result.rows_affected) } +/// Delete all completed and failed items for a specific pipeline. +pub async fn clear_pipeline(db: &DatabaseConnection, pipeline_id: &str) -> DbResult { + let result = WorkQueue::delete_many() + .filter(work_queue::Column::PipelineId.eq(pipeline_id)) + .filter( + work_queue::Column::Status + .eq(WorkQueueStatus::Completed) + .or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)), + ) + .exec(db) + .await?; + Ok(result.rows_affected) +} + +/// Delete all completed/failed items that belong to a pipeline (have a pipeline_id). +/// Does not affect standalone (non-pipeline) work items. +pub async fn clear_all_pipelines(db: &DatabaseConnection) -> DbResult { + let result = WorkQueue::delete_many() + .filter(work_queue::Column::PipelineId.is_not_null()) + .filter( + work_queue::Column::Status + .eq(WorkQueueStatus::Completed) + .or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)), + ) + .exec(db) + .await?; + Ok(result.rows_affected) +} + /// Check if there are any pending or running items. pub async fn has_active_work(db: &DatabaseConnection) -> DbResult { let count = WorkQueue::find()