Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d9c6a7759e |
@@ -43,6 +43,7 @@ pub struct Model {
|
||||
pub file_mtime: Option<chrono::NaiveDateTime>,
|
||||
pub added_at: chrono::NaiveDateTime,
|
||||
pub updated_at: chrono::NaiveDateTime,
|
||||
pub tagged: bool,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
|
||||
@@ -14,6 +14,8 @@ pub enum WorkTaskType {
|
||||
Tag,
|
||||
#[sea_orm(string_value = "organize")]
|
||||
Organize,
|
||||
#[sea_orm(string_value = "enrich")]
|
||||
Enrich,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)]
|
||||
@@ -36,6 +38,7 @@ impl std::fmt::Display for WorkTaskType {
|
||||
Self::Index => write!(f, "index"),
|
||||
Self::Tag => write!(f, "tag"),
|
||||
Self::Organize => write!(f, "organize"),
|
||||
Self::Enrich => write!(f, "enrich"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(Tracks::Table)
|
||||
.add_column(
|
||||
ColumnDef::new(Tracks::Tagged)
|
||||
.boolean()
|
||||
.not_null()
|
||||
.default(false),
|
||||
)
|
||||
.to_owned(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(Tracks::Table)
|
||||
.drop_column(Tracks::Tagged)
|
||||
.to_owned(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Tracks {
|
||||
Table,
|
||||
Tagged,
|
||||
}
|
||||
@@ -16,6 +16,7 @@ mod m20260320_000014_create_playlists;
|
||||
mod m20260320_000015_add_subsonic_password;
|
||||
mod m20260323_000016_remove_orphaned_artists;
|
||||
mod m20260323_000017_create_work_queue_and_scheduler;
|
||||
mod m20260324_000018_add_track_tagged;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20260320_000015_add_subsonic_password::Migration),
|
||||
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
|
||||
Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration),
|
||||
Box::new(m20260324_000018_add_track_tagged::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,13 +87,6 @@ pub async fn get_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||
Ok(Tracks::find()
|
||||
.filter(track::Column::MusicbrainzId.is_null())
|
||||
.all(db)
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Get tracks that need metadata enrichment — either no MBID at all,
|
||||
/// or have an MBID but are missing album info (e.g., freshly downloaded).
|
||||
pub async fn get_needing_metadata(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||
@@ -175,6 +168,14 @@ pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbRes
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Get tracks that haven't been processed by the tagger yet.
|
||||
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||
Ok(Tracks::find()
|
||||
.filter(track::Column::Tagged.eq(false))
|
||||
.all(db)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
||||
let result = Tracks::delete_many()
|
||||
.filter(track::Column::ArtistId.eq(artist_id))
|
||||
|
||||
@@ -199,6 +199,12 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
},
|
||||
enrich: TypeCounts {
|
||||
pending: 0,
|
||||
running: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
},
|
||||
};
|
||||
|
||||
for item in items {
|
||||
@@ -207,6 +213,7 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
|
||||
WorkTaskType::Index => &mut result.index,
|
||||
WorkTaskType::Tag => &mut result.tag,
|
||||
WorkTaskType::Organize => &mut result.organize,
|
||||
WorkTaskType::Enrich => &mut result.enrich,
|
||||
};
|
||||
match item.status {
|
||||
WorkQueueStatus::Pending => counts.pending += 1,
|
||||
@@ -224,15 +231,24 @@ pub struct AllCounts {
|
||||
pub index: TypeCounts,
|
||||
pub tag: TypeCounts,
|
||||
pub organize: TypeCounts,
|
||||
pub enrich: TypeCounts,
|
||||
}
|
||||
|
||||
/// Check if all items for a pipeline are completed.
|
||||
pub async fn pipeline_is_complete(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<bool> {
|
||||
let incomplete = WorkQueue::find()
|
||||
/// Check if all items for a pipeline are completed (excluding a specific item ID,
|
||||
/// typically the currently-running item that hasn't been marked complete yet).
|
||||
pub async fn pipeline_is_complete(
|
||||
db: &DatabaseConnection,
|
||||
pipeline_id: &str,
|
||||
exclude_id: Option<i32>,
|
||||
) -> DbResult<bool> {
|
||||
let mut query = WorkQueue::find()
|
||||
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
|
||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed))
|
||||
.count(db)
|
||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed));
|
||||
if let Some(id) = exclude_id {
|
||||
query = query.filter(work_queue::Column::Id.ne(id));
|
||||
}
|
||||
let incomplete = query.count(db)
|
||||
.await?;
|
||||
Ok(incomplete == 0)
|
||||
}
|
||||
@@ -248,6 +264,35 @@ pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) ->
|
||||
Ok(result.rows_affected)
|
||||
}
|
||||
|
||||
/// Delete all completed and failed items for a specific pipeline.
|
||||
pub async fn clear_pipeline(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<u64> {
|
||||
let result = WorkQueue::delete_many()
|
||||
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||
.filter(
|
||||
work_queue::Column::Status
|
||||
.eq(WorkQueueStatus::Completed)
|
||||
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
|
||||
)
|
||||
.exec(db)
|
||||
.await?;
|
||||
Ok(result.rows_affected)
|
||||
}
|
||||
|
||||
/// Delete all completed/failed items that belong to a pipeline (have a pipeline_id).
|
||||
/// Does not affect standalone (non-pipeline) work items.
|
||||
pub async fn clear_all_pipelines(db: &DatabaseConnection) -> DbResult<u64> {
|
||||
let result = WorkQueue::delete_many()
|
||||
.filter(work_queue::Column::PipelineId.is_not_null())
|
||||
.filter(
|
||||
work_queue::Column::Status
|
||||
.eq(WorkQueueStatus::Completed)
|
||||
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
|
||||
)
|
||||
.exec(db)
|
||||
.await?;
|
||||
Ok(result.rows_affected)
|
||||
}
|
||||
|
||||
/// Check if there are any pending or running items.
|
||||
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
|
||||
let count = WorkQueue::find()
|
||||
|
||||
Reference in New Issue
Block a user