Compare commits
1 Commits
cc42f8ecbb
...
d9c6a7759e
| Author | SHA1 | Date | |
|---|---|---|---|
| d9c6a7759e |
@@ -43,6 +43,7 @@ pub struct Model {
|
|||||||
pub file_mtime: Option<chrono::NaiveDateTime>,
|
pub file_mtime: Option<chrono::NaiveDateTime>,
|
||||||
pub added_at: chrono::NaiveDateTime,
|
pub added_at: chrono::NaiveDateTime,
|
||||||
pub updated_at: chrono::NaiveDateTime,
|
pub updated_at: chrono::NaiveDateTime,
|
||||||
|
pub tagged: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
|||||||
@@ -14,6 +14,8 @@ pub enum WorkTaskType {
|
|||||||
Tag,
|
Tag,
|
||||||
#[sea_orm(string_value = "organize")]
|
#[sea_orm(string_value = "organize")]
|
||||||
Organize,
|
Organize,
|
||||||
|
#[sea_orm(string_value = "enrich")]
|
||||||
|
Enrich,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)]
|
||||||
@@ -36,6 +38,7 @@ impl std::fmt::Display for WorkTaskType {
|
|||||||
Self::Index => write!(f, "index"),
|
Self::Index => write!(f, "index"),
|
||||||
Self::Tag => write!(f, "tag"),
|
Self::Tag => write!(f, "tag"),
|
||||||
Self::Organize => write!(f, "organize"),
|
Self::Organize => write!(f, "organize"),
|
||||||
|
Self::Enrich => write!(f, "enrich"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,40 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Tracks::Table)
|
||||||
|
.add_column(
|
||||||
|
ColumnDef::new(Tracks::Tagged)
|
||||||
|
.boolean()
|
||||||
|
.not_null()
|
||||||
|
.default(false),
|
||||||
|
)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Tracks::Table)
|
||||||
|
.drop_column(Tracks::Tagged)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum Tracks {
|
||||||
|
Table,
|
||||||
|
Tagged,
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ mod m20260320_000014_create_playlists;
|
|||||||
mod m20260320_000015_add_subsonic_password;
|
mod m20260320_000015_add_subsonic_password;
|
||||||
mod m20260323_000016_remove_orphaned_artists;
|
mod m20260323_000016_remove_orphaned_artists;
|
||||||
mod m20260323_000017_create_work_queue_and_scheduler;
|
mod m20260323_000017_create_work_queue_and_scheduler;
|
||||||
|
mod m20260324_000018_add_track_tagged;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20260320_000015_add_subsonic_password::Migration),
|
Box::new(m20260320_000015_add_subsonic_password::Migration),
|
||||||
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
|
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
|
||||||
Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration),
|
Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration),
|
||||||
|
Box::new(m20260324_000018_add_track_tagged::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,13 +87,6 @@ pub async fn get_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
|
||||||
Ok(Tracks::find()
|
|
||||||
.filter(track::Column::MusicbrainzId.is_null())
|
|
||||||
.all(db)
|
|
||||||
.await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get tracks that need metadata enrichment — either no MBID at all,
|
/// Get tracks that need metadata enrichment — either no MBID at all,
|
||||||
/// or have an MBID but are missing album info (e.g., freshly downloaded).
|
/// or have an MBID but are missing album info (e.g., freshly downloaded).
|
||||||
pub async fn get_needing_metadata(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
pub async fn get_needing_metadata(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||||
@@ -175,6 +168,14 @@ pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbRes
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get tracks that haven't been processed by the tagger yet.
|
||||||
|
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||||
|
Ok(Tracks::find()
|
||||||
|
.filter(track::Column::Tagged.eq(false))
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
||||||
let result = Tracks::delete_many()
|
let result = Tracks::delete_many()
|
||||||
.filter(track::Column::ArtistId.eq(artist_id))
|
.filter(track::Column::ArtistId.eq(artist_id))
|
||||||
|
|||||||
@@ -199,6 +199,12 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
|
|||||||
completed: 0,
|
completed: 0,
|
||||||
failed: 0,
|
failed: 0,
|
||||||
},
|
},
|
||||||
|
enrich: TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
for item in items {
|
for item in items {
|
||||||
@@ -207,6 +213,7 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
|
|||||||
WorkTaskType::Index => &mut result.index,
|
WorkTaskType::Index => &mut result.index,
|
||||||
WorkTaskType::Tag => &mut result.tag,
|
WorkTaskType::Tag => &mut result.tag,
|
||||||
WorkTaskType::Organize => &mut result.organize,
|
WorkTaskType::Organize => &mut result.organize,
|
||||||
|
WorkTaskType::Enrich => &mut result.enrich,
|
||||||
};
|
};
|
||||||
match item.status {
|
match item.status {
|
||||||
WorkQueueStatus::Pending => counts.pending += 1,
|
WorkQueueStatus::Pending => counts.pending += 1,
|
||||||
@@ -224,15 +231,24 @@ pub struct AllCounts {
|
|||||||
pub index: TypeCounts,
|
pub index: TypeCounts,
|
||||||
pub tag: TypeCounts,
|
pub tag: TypeCounts,
|
||||||
pub organize: TypeCounts,
|
pub organize: TypeCounts,
|
||||||
|
pub enrich: TypeCounts,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if all items for a pipeline are completed.
|
/// Check if all items for a pipeline are completed (excluding a specific item ID,
|
||||||
pub async fn pipeline_is_complete(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<bool> {
|
/// typically the currently-running item that hasn't been marked complete yet).
|
||||||
let incomplete = WorkQueue::find()
|
pub async fn pipeline_is_complete(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
pipeline_id: &str,
|
||||||
|
exclude_id: Option<i32>,
|
||||||
|
) -> DbResult<bool> {
|
||||||
|
let mut query = WorkQueue::find()
|
||||||
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
|
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
|
||||||
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed))
|
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed));
|
||||||
.count(db)
|
if let Some(id) = exclude_id {
|
||||||
|
query = query.filter(work_queue::Column::Id.ne(id));
|
||||||
|
}
|
||||||
|
let incomplete = query.count(db)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(incomplete == 0)
|
Ok(incomplete == 0)
|
||||||
}
|
}
|
||||||
@@ -248,6 +264,35 @@ pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) ->
|
|||||||
Ok(result.rows_affected)
|
Ok(result.rows_affected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete all completed and failed items for a specific pipeline.
|
||||||
|
pub async fn clear_pipeline(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<u64> {
|
||||||
|
let result = WorkQueue::delete_many()
|
||||||
|
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||||
|
.filter(
|
||||||
|
work_queue::Column::Status
|
||||||
|
.eq(WorkQueueStatus::Completed)
|
||||||
|
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
|
||||||
|
)
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all completed/failed items that belong to a pipeline (have a pipeline_id).
|
||||||
|
/// Does not affect standalone (non-pipeline) work items.
|
||||||
|
pub async fn clear_all_pipelines(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
|
let result = WorkQueue::delete_many()
|
||||||
|
.filter(work_queue::Column::PipelineId.is_not_null())
|
||||||
|
.filter(
|
||||||
|
work_queue::Column::Status
|
||||||
|
.eq(WorkQueueStatus::Completed)
|
||||||
|
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
|
||||||
|
)
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
/// Check if there are any pending or running items.
|
/// Check if there are any pending or running items.
|
||||||
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
|
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
|
||||||
let count = WorkQueue::find()
|
let count = WorkQueue::find()
|
||||||
|
|||||||
Reference in New Issue
Block a user