fixed some unwatch cleanup stuff

This commit is contained in:
Connor Johnstone
2026-03-24 20:41:13 -04:00
parent cc42f8ecbb
commit d9c6a7759e
6 changed files with 104 additions and 12 deletions
+1
View File
@@ -43,6 +43,7 @@ pub struct Model {
pub file_mtime: Option<chrono::NaiveDateTime>,
pub added_at: chrono::NaiveDateTime,
pub updated_at: chrono::NaiveDateTime,
pub tagged: bool,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+3
View File
@@ -14,6 +14,8 @@ pub enum WorkTaskType {
Tag,
#[sea_orm(string_value = "organize")]
Organize,
#[sea_orm(string_value = "enrich")]
Enrich,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)]
@@ -36,6 +38,7 @@ impl std::fmt::Display for WorkTaskType {
Self::Index => write!(f, "index"),
Self::Tag => write!(f, "tag"),
Self::Organize => write!(f, "organize"),
Self::Enrich => write!(f, "enrich"),
}
}
}
@@ -0,0 +1,40 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Tracks::Table)
.add_column(
ColumnDef::new(Tracks::Tagged)
.boolean()
.not_null()
.default(false),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Tracks::Table)
.drop_column(Tracks::Tagged)
.to_owned(),
)
.await
}
}
#[derive(DeriveIden)]
enum Tracks {
Table,
Tagged,
}
+2
View File
@@ -16,6 +16,7 @@ mod m20260320_000014_create_playlists;
mod m20260320_000015_add_subsonic_password;
mod m20260323_000016_remove_orphaned_artists;
mod m20260323_000017_create_work_queue_and_scheduler;
mod m20260324_000018_add_track_tagged;
pub struct Migrator;
@@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260320_000015_add_subsonic_password::Migration),
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration),
Box::new(m20260324_000018_add_track_tagged::Migration),
]
}
}
+8 -7
View File
@@ -87,13 +87,6 @@ pub async fn get_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<
.await?)
}
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
Ok(Tracks::find()
.filter(track::Column::MusicbrainzId.is_null())
.all(db)
.await?)
}
/// Get tracks that need metadata enrichment — either no MBID at all,
/// or have an MBID but are missing album info (e.g., freshly downloaded).
pub async fn get_needing_metadata(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
@@ -175,6 +168,14 @@ pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbRes
.await?)
}
/// Get tracks that haven't been processed by the tagger yet.
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
Ok(Tracks::find()
.filter(track::Column::Tagged.eq(false))
.all(db)
.await?)
}
pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
let result = Tracks::delete_many()
.filter(track::Column::ArtistId.eq(artist_id))
+50 -5
View File
@@ -199,6 +199,12 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
completed: 0,
failed: 0,
},
enrich: TypeCounts {
pending: 0,
running: 0,
completed: 0,
failed: 0,
},
};
for item in items {
@@ -207,6 +213,7 @@ pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
WorkTaskType::Index => &mut result.index,
WorkTaskType::Tag => &mut result.tag,
WorkTaskType::Organize => &mut result.organize,
WorkTaskType::Enrich => &mut result.enrich,
};
match item.status {
WorkQueueStatus::Pending => counts.pending += 1,
@@ -224,15 +231,24 @@ pub struct AllCounts {
pub index: TypeCounts,
pub tag: TypeCounts,
pub organize: TypeCounts,
pub enrich: TypeCounts,
}
/// Check if all items for a pipeline are completed.
pub async fn pipeline_is_complete(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<bool> {
let incomplete = WorkQueue::find()
/// Check if all items for a pipeline are completed (excluding a specific item ID,
/// typically the currently-running item that hasn't been marked complete yet).
pub async fn pipeline_is_complete(
db: &DatabaseConnection,
pipeline_id: &str,
exclude_id: Option<i32>,
) -> DbResult<bool> {
let mut query = WorkQueue::find()
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed))
.count(db)
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed));
if let Some(id) = exclude_id {
query = query.filter(work_queue::Column::Id.ne(id));
}
let incomplete = query.count(db)
.await?;
Ok(incomplete == 0)
}
@@ -248,6 +264,35 @@ pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) ->
Ok(result.rows_affected)
}
/// Delete all completed and failed items for a specific pipeline.
pub async fn clear_pipeline(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<u64> {
let result = WorkQueue::delete_many()
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
.filter(
work_queue::Column::Status
.eq(WorkQueueStatus::Completed)
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
)
.exec(db)
.await?;
Ok(result.rows_affected)
}
/// Delete all completed/failed items that belong to a pipeline (have a pipeline_id).
/// Does not affect standalone (non-pipeline) work items.
pub async fn clear_all_pipelines(db: &DatabaseConnection) -> DbResult<u64> {
let result = WorkQueue::delete_many()
.filter(work_queue::Column::PipelineId.is_not_null())
.filter(
work_queue::Column::Status
.eq(WorkQueueStatus::Completed)
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
)
.exec(db)
.await?;
Ok(result.rows_affected)
}
/// Check if there are any pending or running items.
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
let count = WorkQueue::find()