Compare commits
14 Commits
e76b2fc575
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 181f736f25 | |||
| 8045dadc57 | |||
| dcf4993f68 | |||
| cf5a38a376 | |||
| e4947191d0 | |||
| b4e0756a90 | |||
| c425402857 | |||
| 64e20136f0 | |||
| 820d37262d | |||
| d9c6a7759e | |||
| cc42f8ecbb | |||
| 1ff385c826 | |||
| a942c229ae | |||
| d2b21789d3 |
@@ -3,17 +3,21 @@ pub mod artist;
|
|||||||
pub mod download_queue;
|
pub mod download_queue;
|
||||||
pub mod playlist;
|
pub mod playlist;
|
||||||
pub mod playlist_track;
|
pub mod playlist_track;
|
||||||
|
pub mod scheduler_state;
|
||||||
pub mod search_cache;
|
pub mod search_cache;
|
||||||
pub mod track;
|
pub mod track;
|
||||||
pub mod user;
|
pub mod user;
|
||||||
pub mod wanted_item;
|
pub mod wanted_item;
|
||||||
|
pub mod work_queue;
|
||||||
|
|
||||||
pub use album::Entity as Albums;
|
pub use album::Entity as Albums;
|
||||||
pub use artist::Entity as Artists;
|
pub use artist::Entity as Artists;
|
||||||
pub use download_queue::Entity as DownloadQueue;
|
pub use download_queue::Entity as DownloadQueue;
|
||||||
pub use playlist::Entity as Playlists;
|
pub use playlist::Entity as Playlists;
|
||||||
pub use playlist_track::Entity as PlaylistTracks;
|
pub use playlist_track::Entity as PlaylistTracks;
|
||||||
|
pub use scheduler_state::Entity as SchedulerStates;
|
||||||
pub use search_cache::Entity as SearchCache;
|
pub use search_cache::Entity as SearchCache;
|
||||||
pub use track::Entity as Tracks;
|
pub use track::Entity as Tracks;
|
||||||
pub use user::Entity as Users;
|
pub use user::Entity as Users;
|
||||||
pub use wanted_item::Entity as WantedItems;
|
pub use wanted_item::Entity as WantedItems;
|
||||||
|
pub use work_queue::Entity as WorkQueues;
|
||||||
|
|||||||
@@ -0,0 +1,23 @@
|
|||||||
|
use sea_orm::entity::prelude::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
|
||||||
|
#[sea_orm(table_name = "scheduler_state")]
|
||||||
|
pub struct Model {
|
||||||
|
#[sea_orm(primary_key)]
|
||||||
|
pub id: i32,
|
||||||
|
#[sea_orm(unique)]
|
||||||
|
pub job_name: String,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub last_run_at: Option<chrono::NaiveDateTime>,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub last_result: Option<String>,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub next_run_at: Option<chrono::NaiveDateTime>,
|
||||||
|
pub enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
pub enum Relation {}
|
||||||
|
|
||||||
|
impl ActiveModelBehavior for ActiveModel {}
|
||||||
@@ -43,6 +43,7 @@ pub struct Model {
|
|||||||
pub file_mtime: Option<chrono::NaiveDateTime>,
|
pub file_mtime: Option<chrono::NaiveDateTime>,
|
||||||
pub added_at: chrono::NaiveDateTime,
|
pub added_at: chrono::NaiveDateTime,
|
||||||
pub updated_at: chrono::NaiveDateTime,
|
pub updated_at: chrono::NaiveDateTime,
|
||||||
|
pub tagged: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
|||||||
@@ -0,0 +1,69 @@
|
|||||||
|
use sea_orm::entity::prelude::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(
|
||||||
|
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumIter, DeriveActiveEnum,
|
||||||
|
)]
|
||||||
|
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||||
|
pub enum WorkTaskType {
|
||||||
|
#[sea_orm(string_value = "download")]
|
||||||
|
Download,
|
||||||
|
#[sea_orm(string_value = "index")]
|
||||||
|
Index,
|
||||||
|
#[sea_orm(string_value = "tag")]
|
||||||
|
Tag,
|
||||||
|
#[sea_orm(string_value = "organize")]
|
||||||
|
Organize,
|
||||||
|
#[sea_orm(string_value = "enrich")]
|
||||||
|
Enrich,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumIter, DeriveActiveEnum)]
|
||||||
|
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||||
|
pub enum WorkQueueStatus {
|
||||||
|
#[sea_orm(string_value = "pending")]
|
||||||
|
Pending,
|
||||||
|
#[sea_orm(string_value = "running")]
|
||||||
|
Running,
|
||||||
|
#[sea_orm(string_value = "completed")]
|
||||||
|
Completed,
|
||||||
|
#[sea_orm(string_value = "failed")]
|
||||||
|
Failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for WorkTaskType {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Download => write!(f, "download"),
|
||||||
|
Self::Index => write!(f, "index"),
|
||||||
|
Self::Tag => write!(f, "tag"),
|
||||||
|
Self::Organize => write!(f, "organize"),
|
||||||
|
Self::Enrich => write!(f, "enrich"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
|
||||||
|
#[sea_orm(table_name = "work_queue")]
|
||||||
|
pub struct Model {
|
||||||
|
#[sea_orm(primary_key)]
|
||||||
|
pub id: i32,
|
||||||
|
pub task_type: WorkTaskType,
|
||||||
|
pub status: WorkQueueStatus,
|
||||||
|
pub payload_json: String,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub pipeline_id: Option<String>,
|
||||||
|
pub created_at: chrono::NaiveDateTime,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub started_at: Option<chrono::NaiveDateTime>,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub completed_at: Option<chrono::NaiveDateTime>,
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub error: Option<String>,
|
||||||
|
pub retry_count: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
pub enum Relation {}
|
||||||
|
|
||||||
|
impl ActiveModelBehavior for ActiveModel {}
|
||||||
@@ -0,0 +1,141 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
// Work queue table
|
||||||
|
manager
|
||||||
|
.create_table(
|
||||||
|
Table::create()
|
||||||
|
.table(WorkQueue::Table)
|
||||||
|
.if_not_exists()
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(WorkQueue::Id)
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.auto_increment()
|
||||||
|
.primary_key(),
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(WorkQueue::TaskType).text().not_null())
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(WorkQueue::Status)
|
||||||
|
.text()
|
||||||
|
.not_null()
|
||||||
|
.default("pending"),
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(WorkQueue::PayloadJson).text().not_null())
|
||||||
|
.col(ColumnDef::new(WorkQueue::PipelineId).text())
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(WorkQueue::CreatedAt)
|
||||||
|
.date_time()
|
||||||
|
.not_null()
|
||||||
|
.default(Expr::current_timestamp()),
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(WorkQueue::StartedAt).date_time())
|
||||||
|
.col(ColumnDef::new(WorkQueue::CompletedAt).date_time())
|
||||||
|
.col(ColumnDef::new(WorkQueue::Error).text())
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(WorkQueue::RetryCount)
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.default(0),
|
||||||
|
)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.name("idx_work_queue_status_type")
|
||||||
|
.table(WorkQueue::Table)
|
||||||
|
.col(WorkQueue::Status)
|
||||||
|
.col(WorkQueue::TaskType)
|
||||||
|
.col(WorkQueue::CreatedAt)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.name("idx_work_queue_pipeline_id")
|
||||||
|
.table(WorkQueue::Table)
|
||||||
|
.col(WorkQueue::PipelineId)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Scheduler state table
|
||||||
|
manager
|
||||||
|
.create_table(
|
||||||
|
Table::create()
|
||||||
|
.table(SchedulerState::Table)
|
||||||
|
.if_not_exists()
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(SchedulerState::Id)
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.auto_increment()
|
||||||
|
.primary_key(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(SchedulerState::JobName)
|
||||||
|
.text()
|
||||||
|
.not_null()
|
||||||
|
.unique_key(),
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(SchedulerState::LastRunAt).date_time())
|
||||||
|
.col(ColumnDef::new(SchedulerState::LastResult).text())
|
||||||
|
.col(ColumnDef::new(SchedulerState::NextRunAt).date_time())
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(SchedulerState::Enabled)
|
||||||
|
.boolean()
|
||||||
|
.not_null()
|
||||||
|
.default(true),
|
||||||
|
)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_table(Table::drop().table(SchedulerState::Table).to_owned())
|
||||||
|
.await?;
|
||||||
|
manager
|
||||||
|
.drop_table(Table::drop().table(WorkQueue::Table).to_owned())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum WorkQueue {
|
||||||
|
Table,
|
||||||
|
Id,
|
||||||
|
TaskType,
|
||||||
|
Status,
|
||||||
|
PayloadJson,
|
||||||
|
PipelineId,
|
||||||
|
CreatedAt,
|
||||||
|
StartedAt,
|
||||||
|
CompletedAt,
|
||||||
|
Error,
|
||||||
|
RetryCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum SchedulerState {
|
||||||
|
Table,
|
||||||
|
Id,
|
||||||
|
JobName,
|
||||||
|
LastRunAt,
|
||||||
|
LastResult,
|
||||||
|
NextRunAt,
|
||||||
|
Enabled,
|
||||||
|
}
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Tracks::Table)
|
||||||
|
.add_column(
|
||||||
|
ColumnDef::new(Tracks::Tagged)
|
||||||
|
.boolean()
|
||||||
|
.not_null()
|
||||||
|
.default(false),
|
||||||
|
)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Tracks::Table)
|
||||||
|
.drop_column(Tracks::Tagged)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum Tracks {
|
||||||
|
Table,
|
||||||
|
Tagged,
|
||||||
|
}
|
||||||
@@ -0,0 +1,59 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
// Drop the unique index on artist name — different artists can share a name
|
||||||
|
// (e.g., "Clara" the Italian singer and "Clara" the Brazilian singer)
|
||||||
|
manager
|
||||||
|
.drop_index(
|
||||||
|
Index::drop()
|
||||||
|
.name("idx_artists_name_unique")
|
||||||
|
.table(Artists::Table)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Replace with a non-unique index for lookup performance
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.name("idx_artists_name")
|
||||||
|
.table(Artists::Table)
|
||||||
|
.col(Artists::Name)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_index(
|
||||||
|
Index::drop()
|
||||||
|
.name("idx_artists_name")
|
||||||
|
.table(Artists::Table)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.name("idx_artists_name_unique")
|
||||||
|
.table(Artists::Table)
|
||||||
|
.col(Artists::Name)
|
||||||
|
.unique()
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum Artists {
|
||||||
|
Table,
|
||||||
|
Name,
|
||||||
|
}
|
||||||
@@ -15,6 +15,9 @@ mod m20260320_000013_add_artist_monitoring;
|
|||||||
mod m20260320_000014_create_playlists;
|
mod m20260320_000014_create_playlists;
|
||||||
mod m20260320_000015_add_subsonic_password;
|
mod m20260320_000015_add_subsonic_password;
|
||||||
mod m20260323_000016_remove_orphaned_artists;
|
mod m20260323_000016_remove_orphaned_artists;
|
||||||
|
mod m20260323_000017_create_work_queue_and_scheduler;
|
||||||
|
mod m20260324_000018_add_track_tagged;
|
||||||
|
mod m20260325_000019_allow_duplicate_artist_names;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@@ -37,6 +40,9 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20260320_000014_create_playlists::Migration),
|
Box::new(m20260320_000014_create_playlists::Migration),
|
||||||
Box::new(m20260320_000015_add_subsonic_password::Migration),
|
Box::new(m20260320_000015_add_subsonic_password::Migration),
|
||||||
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
|
Box::new(m20260323_000016_remove_orphaned_artists::Migration),
|
||||||
|
Box::new(m20260323_000017_create_work_queue_and_scheduler::Migration),
|
||||||
|
Box::new(m20260324_000018_add_track_tagged::Migration),
|
||||||
|
Box::new(m20260325_000019_allow_duplicate_artist_names::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use sea_orm::sea_query::Expr;
|
||||||
use sea_orm::*;
|
use sea_orm::*;
|
||||||
|
|
||||||
use crate::entities::album::{self, ActiveModel, Entity as Albums, Model as Album};
|
use crate::entities::album::{self, ActiveModel, Entity as Albums, Model as Album};
|
||||||
@@ -84,6 +85,131 @@ pub async fn get_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_random(db: &DatabaseConnection, count: u64) -> DbResult<Vec<Album>> {
|
||||||
|
Ok(Albums::find()
|
||||||
|
.order_by(Expr::cust("RANDOM()"), Order::Asc)
|
||||||
|
.limit(count)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_newest(db: &DatabaseConnection, limit: u64, offset: u64) -> DbResult<Vec<Album>> {
|
||||||
|
Ok(Albums::find()
|
||||||
|
.order_by_desc(Expr::cust("COALESCE(year, 0)"))
|
||||||
|
.order_by_asc(album::Column::Name)
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_by_year_range(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
from: i32,
|
||||||
|
to: i32,
|
||||||
|
limit: u64,
|
||||||
|
offset: u64,
|
||||||
|
) -> DbResult<Vec<Album>> {
|
||||||
|
let (lo, hi) = if from <= to { (from, to) } else { (to, from) };
|
||||||
|
Ok(Albums::find()
|
||||||
|
.filter(album::Column::Year.gte(lo))
|
||||||
|
.filter(album::Column::Year.lte(hi))
|
||||||
|
.order_by_asc(album::Column::Year)
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_by_genre(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
genre: &str,
|
||||||
|
limit: u64,
|
||||||
|
offset: u64,
|
||||||
|
) -> DbResult<Vec<Album>> {
|
||||||
|
use crate::entities::track;
|
||||||
|
|
||||||
|
// Find album IDs that have tracks matching this genre
|
||||||
|
let pattern = format!("%{genre}%");
|
||||||
|
let album_ids: Vec<i32> = track::Entity::find()
|
||||||
|
.filter(Expr::cust_with_values(
|
||||||
|
"LOWER(genre) LIKE LOWER(?)",
|
||||||
|
[pattern],
|
||||||
|
))
|
||||||
|
.filter(track::Column::AlbumId.is_not_null())
|
||||||
|
.all(db)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|t| t.album_id)
|
||||||
|
.collect::<std::collections::HashSet<_>>()
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if album_ids.is_empty() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Albums::find()
|
||||||
|
.filter(album::Column::Id.is_in(album_ids))
|
||||||
|
.order_by_asc(album::Column::Name)
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_alphabetical_by_artist(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
limit: u64,
|
||||||
|
offset: u64,
|
||||||
|
) -> DbResult<Vec<Album>> {
|
||||||
|
Ok(Albums::find()
|
||||||
|
.order_by_asc(Expr::cust("LOWER(album_artist)"))
|
||||||
|
.order_by_asc(Expr::cust("LOWER(name)"))
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_recent(db: &DatabaseConnection, limit: u64, offset: u64) -> DbResult<Vec<Album>> {
|
||||||
|
use crate::entities::track;
|
||||||
|
|
||||||
|
// Find albums ordered by their most recently added track
|
||||||
|
let tracks = track::Entity::find()
|
||||||
|
.filter(track::Column::AlbumId.is_not_null())
|
||||||
|
.order_by_desc(track::Column::AddedAt)
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Collect unique album IDs in order of most recent track
|
||||||
|
let mut seen = std::collections::HashSet::new();
|
||||||
|
let album_ids: Vec<i32> = tracks
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|t| t.album_id)
|
||||||
|
.filter(|id| seen.insert(*id))
|
||||||
|
.skip(offset as usize)
|
||||||
|
.take(limit as usize)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if album_ids.is_empty() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch albums and preserve the ordering
|
||||||
|
let albums = Albums::find()
|
||||||
|
.filter(album::Column::Id.is_in(album_ids.clone()))
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let album_map: std::collections::HashMap<i32, Album> =
|
||||||
|
albums.into_iter().map(|a| (a.id, a)).collect();
|
||||||
|
Ok(album_ids
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|id| album_map.get(&id).cloned())
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn update(db: &DatabaseConnection, id: i32, model: ActiveModel) -> DbResult<Album> {
|
pub async fn update(db: &DatabaseConnection, id: i32, model: ActiveModel) -> DbResult<Album> {
|
||||||
let mut active = model;
|
let mut active = model;
|
||||||
active.id = Set(id);
|
active.id = Set(id);
|
||||||
@@ -94,3 +220,29 @@ pub async fn delete(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
|||||||
Albums::delete_by_id(id).exec(db).await?;
|
Albums::delete_by_id(id).exec(db).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
||||||
|
let result = Albums::delete_many()
|
||||||
|
.filter(album::Column::ArtistId.eq(artist_id))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete albums that have no tracks referencing them.
|
||||||
|
pub async fn delete_empty(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
|
let all = Albums::find().all(db).await?;
|
||||||
|
let mut deleted = 0u64;
|
||||||
|
for a in all {
|
||||||
|
let has_tracks = crate::entities::track::Entity::find()
|
||||||
|
.filter(crate::entities::track::Column::AlbumId.eq(a.id))
|
||||||
|
.count(db)
|
||||||
|
.await?
|
||||||
|
> 0;
|
||||||
|
if !has_tracks {
|
||||||
|
Albums::delete_by_id(a.id).exec(db).await?;
|
||||||
|
deleted += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(deleted)
|
||||||
|
}
|
||||||
|
|||||||
+63
-3
@@ -1,4 +1,5 @@
|
|||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
use sea_orm::sea_query::Expr;
|
||||||
use sea_orm::*;
|
use sea_orm::*;
|
||||||
|
|
||||||
use crate::entities::artist::{self, ActiveModel, Entity as Artists, Model as Artist};
|
use crate::entities::artist::{self, ActiveModel, Entity as Artists, Model as Artist};
|
||||||
@@ -20,14 +21,19 @@ pub async fn upsert(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(existing) = find_by_name(db, name).await? {
|
if let Some(existing) = find_by_name(db, name).await? {
|
||||||
// Update musicbrainz_id if we have one now and didn't before
|
|
||||||
if musicbrainz_id.is_some() && existing.musicbrainz_id.is_none() {
|
if musicbrainz_id.is_some() && existing.musicbrainz_id.is_none() {
|
||||||
|
// We have an MBID now and the existing record doesn't — update it
|
||||||
let mut active: ActiveModel = existing.into();
|
let mut active: ActiveModel = existing.into();
|
||||||
active.musicbrainz_id = Set(musicbrainz_id.map(String::from));
|
active.musicbrainz_id = Set(musicbrainz_id.map(String::from));
|
||||||
return Ok(active.update(db).await?);
|
return Ok(active.update(db).await?);
|
||||||
}
|
}
|
||||||
|
if musicbrainz_id.is_none() || existing.musicbrainz_id.as_deref() == musicbrainz_id {
|
||||||
|
// No MBID provided, or MBIDs match — return existing
|
||||||
return Ok(existing);
|
return Ok(existing);
|
||||||
}
|
}
|
||||||
|
// MBIDs differ — this is a different artist with the same name.
|
||||||
|
// Fall through to insert a new record.
|
||||||
|
}
|
||||||
|
|
||||||
// Try to insert — if we race with another task, catch the unique constraint
|
// Try to insert — if we race with another task, catch the unique constraint
|
||||||
// violation and fall back to a lookup.
|
// violation and fall back to a lookup.
|
||||||
@@ -47,7 +53,15 @@ pub async fn upsert(
|
|||||||
Err(DbErr::Exec(RuntimeErr::SqlxError(sqlx_err)))
|
Err(DbErr::Exec(RuntimeErr::SqlxError(sqlx_err)))
|
||||||
if sqlx_err.to_string().contains("UNIQUE constraint failed") =>
|
if sqlx_err.to_string().contains("UNIQUE constraint failed") =>
|
||||||
{
|
{
|
||||||
// Lost the race — another task inserted first, just look it up
|
// Lost the race on MBID unique constraint — look up by MBID first, then name
|
||||||
|
if let Some(mbid) = musicbrainz_id
|
||||||
|
&& let Some(existing) = Artists::find()
|
||||||
|
.filter(artist::Column::MusicbrainzId.eq(mbid))
|
||||||
|
.one(db)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
return Ok(existing);
|
||||||
|
}
|
||||||
find_by_name(db, name)
|
find_by_name(db, name)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| DbError::Other(format!("artist '{name}' vanished after conflict")))
|
.ok_or_else(|| DbError::Other(format!("artist '{name}' vanished after conflict")))
|
||||||
@@ -72,13 +86,20 @@ pub async fn find_by_name(db: &DatabaseConnection, name: &str) -> DbResult<Optio
|
|||||||
|
|
||||||
pub async fn list(db: &DatabaseConnection, limit: u64, offset: u64) -> DbResult<Vec<Artist>> {
|
pub async fn list(db: &DatabaseConnection, limit: u64, offset: u64) -> DbResult<Vec<Artist>> {
|
||||||
Ok(Artists::find()
|
Ok(Artists::find()
|
||||||
.order_by_asc(artist::Column::Name)
|
.order_by_asc(Expr::cust("LOWER(name)"))
|
||||||
.limit(limit)
|
.limit(limit)
|
||||||
.offset(offset)
|
.offset(offset)
|
||||||
.all(db)
|
.all(db)
|
||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn list_all(db: &DatabaseConnection) -> DbResult<Vec<Artist>> {
|
||||||
|
Ok(Artists::find()
|
||||||
|
.order_by_asc(Expr::cust("LOWER(name)"))
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn update(db: &DatabaseConnection, id: i32, model: ActiveModel) -> DbResult<Artist> {
|
pub async fn update(db: &DatabaseConnection, id: i32, model: ActiveModel) -> DbResult<Artist> {
|
||||||
let mut active = model;
|
let mut active = model;
|
||||||
active.id = Set(id);
|
active.id = Set(id);
|
||||||
@@ -127,6 +148,45 @@ pub async fn list_monitored(db: &DatabaseConnection) -> DbResult<Vec<Artist>> {
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete artists that have no tracks, no wanted items, no albums, and are not monitored.
|
||||||
|
pub async fn delete_unused(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
|
let conn = db;
|
||||||
|
let all = Artists::find().all(conn).await?;
|
||||||
|
let mut deleted = 0u64;
|
||||||
|
for a in all {
|
||||||
|
if a.monitored {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let has_tracks = crate::entities::track::Entity::find()
|
||||||
|
.filter(crate::entities::track::Column::ArtistId.eq(a.id))
|
||||||
|
.count(conn)
|
||||||
|
.await?
|
||||||
|
> 0;
|
||||||
|
if has_tracks {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let has_wanted = crate::entities::wanted_item::Entity::find()
|
||||||
|
.filter(crate::entities::wanted_item::Column::ArtistId.eq(a.id))
|
||||||
|
.count(conn)
|
||||||
|
.await?
|
||||||
|
> 0;
|
||||||
|
if has_wanted {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let has_albums = crate::entities::album::Entity::find()
|
||||||
|
.filter(crate::entities::album::Column::ArtistId.eq(a.id))
|
||||||
|
.count(conn)
|
||||||
|
.await?
|
||||||
|
> 0;
|
||||||
|
if has_albums {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Artists::delete_by_id(a.id).exec(conn).await?;
|
||||||
|
deleted += 1;
|
||||||
|
}
|
||||||
|
Ok(deleted)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn update_last_checked(db: &DatabaseConnection, id: i32) -> DbResult<Artist> {
|
pub async fn update_last_checked(db: &DatabaseConnection, id: i32) -> DbResult<Artist> {
|
||||||
let existing = get_by_id(db, id).await?;
|
let existing = get_by_id(db, id).await?;
|
||||||
let mut active: ActiveModel = existing.into();
|
let mut active: ActiveModel = existing.into();
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ pub mod artists;
|
|||||||
pub mod cache;
|
pub mod cache;
|
||||||
pub mod downloads;
|
pub mod downloads;
|
||||||
pub mod playlists;
|
pub mod playlists;
|
||||||
|
pub mod scheduler_state;
|
||||||
pub mod tracks;
|
pub mod tracks;
|
||||||
pub mod users;
|
pub mod users;
|
||||||
pub mod wanted;
|
pub mod wanted;
|
||||||
|
pub mod work_queue;
|
||||||
|
|||||||
@@ -0,0 +1,67 @@
|
|||||||
|
use chrono::Utc;
|
||||||
|
use sea_orm::*;
|
||||||
|
|
||||||
|
use crate::entities::scheduler_state::{self, ActiveModel, Entity as SchedulerState, Model as Job};
|
||||||
|
use crate::error::DbResult;
|
||||||
|
|
||||||
|
/// Get or create a scheduler job by name. If it doesn't exist, creates it with defaults.
|
||||||
|
pub async fn get_or_create(db: &DatabaseConnection, job_name: &str) -> DbResult<Job> {
|
||||||
|
if let Some(existing) = SchedulerState::find()
|
||||||
|
.filter(scheduler_state::Column::JobName.eq(job_name))
|
||||||
|
.one(db)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
return Ok(existing);
|
||||||
|
}
|
||||||
|
|
||||||
|
let active = ActiveModel {
|
||||||
|
job_name: Set(job_name.to_string()),
|
||||||
|
last_run_at: Set(None),
|
||||||
|
last_result: Set(None),
|
||||||
|
next_run_at: Set(None),
|
||||||
|
enabled: Set(true),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
Ok(active.insert(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the last run time and result for a job.
|
||||||
|
pub async fn update_last_run(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
job_name: &str,
|
||||||
|
result: &str,
|
||||||
|
) -> DbResult<Job> {
|
||||||
|
let job = get_or_create(db, job_name).await?;
|
||||||
|
let mut active: ActiveModel = job.into();
|
||||||
|
active.last_run_at = Set(Some(Utc::now().naive_utc()));
|
||||||
|
active.last_result = Set(Some(result.to_string()));
|
||||||
|
Ok(active.update(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the next scheduled run time for a job.
|
||||||
|
pub async fn update_next_run(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
job_name: &str,
|
||||||
|
next_at: Option<chrono::NaiveDateTime>,
|
||||||
|
) -> DbResult<Job> {
|
||||||
|
let job = get_or_create(db, job_name).await?;
|
||||||
|
let mut active: ActiveModel = job.into();
|
||||||
|
active.next_run_at = Set(next_at);
|
||||||
|
Ok(active.update(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the enabled state for a job.
|
||||||
|
pub async fn set_enabled(db: &DatabaseConnection, job_name: &str, enabled: bool) -> DbResult<Job> {
|
||||||
|
let job = get_or_create(db, job_name).await?;
|
||||||
|
let mut active: ActiveModel = job.into();
|
||||||
|
active.enabled = Set(enabled);
|
||||||
|
Ok(active.update(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all scheduler jobs.
|
||||||
|
pub async fn list_all(db: &DatabaseConnection) -> DbResult<Vec<Job>> {
|
||||||
|
Ok(SchedulerState::find()
|
||||||
|
.order_by_asc(scheduler_state::Column::JobName)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
+116
-3
@@ -45,6 +45,13 @@ pub async fn get_by_path(db: &DatabaseConnection, file_path: &str) -> DbResult<O
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_by_mbid(db: &DatabaseConnection, mbid: &str) -> DbResult<Vec<Track>> {
|
||||||
|
Ok(Tracks::find()
|
||||||
|
.filter(track::Column::MusicbrainzId.eq(mbid))
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn list(db: &DatabaseConnection, limit: u64, offset: u64) -> DbResult<Vec<Track>> {
|
pub async fn list(db: &DatabaseConnection, limit: u64, offset: u64) -> DbResult<Vec<Track>> {
|
||||||
Ok(Tracks::find()
|
Ok(Tracks::find()
|
||||||
.order_by_asc(track::Column::Artist)
|
.order_by_asc(track::Column::Artist)
|
||||||
@@ -87,10 +94,10 @@ pub async fn get_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
pub async fn count_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
||||||
Ok(Tracks::find()
|
Ok(Tracks::find()
|
||||||
.filter(track::Column::MusicbrainzId.is_null())
|
.filter(track::Column::ArtistId.eq(artist_id))
|
||||||
.all(db)
|
.count(db)
|
||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,6 +171,54 @@ pub async fn get_random(db: &DatabaseConnection, count: u64) -> DbResult<Vec<Tra
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get random tracks with optional genre and year range filters.
|
||||||
|
pub async fn get_random_filtered(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
count: u64,
|
||||||
|
genre: Option<&str>,
|
||||||
|
from_year: Option<i32>,
|
||||||
|
to_year: Option<i32>,
|
||||||
|
) -> DbResult<Vec<Track>> {
|
||||||
|
let mut query = Tracks::find();
|
||||||
|
if let Some(g) = genre {
|
||||||
|
let pattern = format!("%{g}%");
|
||||||
|
query = query.filter(Expr::cust_with_values(
|
||||||
|
"LOWER(genre) LIKE LOWER(?)",
|
||||||
|
[pattern],
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(y) = from_year {
|
||||||
|
query = query.filter(track::Column::Year.gte(y));
|
||||||
|
}
|
||||||
|
if let Some(y) = to_year {
|
||||||
|
query = query.filter(track::Column::Year.lte(y));
|
||||||
|
}
|
||||||
|
Ok(query
|
||||||
|
.order_by(Expr::cust("RANDOM()"), Order::Asc)
|
||||||
|
.limit(count)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get tracks matching a genre with pagination.
|
||||||
|
pub async fn get_by_genre_paginated(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
genre: &str,
|
||||||
|
limit: u64,
|
||||||
|
offset: u64,
|
||||||
|
) -> DbResult<Vec<Track>> {
|
||||||
|
let pattern = format!("%{genre}%");
|
||||||
|
Ok(Tracks::find()
|
||||||
|
.filter(Expr::cust_with_values(
|
||||||
|
"LOWER(genre) LIKE LOWER(?)",
|
||||||
|
[pattern],
|
||||||
|
))
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get tracks added within the last N days.
|
/// Get tracks added within the last N days.
|
||||||
pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbResult<Vec<Track>> {
|
pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbResult<Vec<Track>> {
|
||||||
let cutoff = Utc::now().naive_utc() - chrono::Duration::days(i64::from(days));
|
let cutoff = Utc::now().naive_utc() - chrono::Duration::days(i64::from(days));
|
||||||
@@ -175,6 +230,64 @@ pub async fn get_recent(db: &DatabaseConnection, days: u32, limit: u64) -> DbRes
|
|||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get tracks that haven't been processed by the tagger yet.
|
||||||
|
pub async fn get_untagged(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||||
|
Ok(Tracks::find()
|
||||||
|
.filter(track::Column::Tagged.eq(false))
|
||||||
|
.all(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
||||||
|
let result = Tracks::delete_many()
|
||||||
|
.filter(track::Column::ArtistId.eq(artist_id))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get tracks that have been tagged but have no corresponding wanted_item.
|
||||||
|
/// These are files that went through the pipeline but aren't part of any watched content.
|
||||||
|
pub async fn get_unwanted(db: &DatabaseConnection) -> DbResult<Vec<Track>> {
|
||||||
|
let all_tagged = Tracks::find()
|
||||||
|
.filter(track::Column::Tagged.eq(true))
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let all_wanted = crate::entities::wanted_item::Entity::find().all(db).await?;
|
||||||
|
|
||||||
|
let wanted_mbids: std::collections::HashSet<&str> = all_wanted
|
||||||
|
.iter()
|
||||||
|
.filter_map(|w| w.musicbrainz_id.as_deref())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let wanted_track_ids: std::collections::HashSet<i32> =
|
||||||
|
all_wanted.iter().filter_map(|w| w.track_id).collect();
|
||||||
|
|
||||||
|
Ok(all_tagged
|
||||||
|
.into_iter()
|
||||||
|
.filter(|t| {
|
||||||
|
// Not linked by track_id
|
||||||
|
!wanted_track_ids.contains(&t.id)
|
||||||
|
// Not linked by MBID
|
||||||
|
&& !t.musicbrainz_id.as_deref().is_some_and(|mbid| wanted_mbids.contains(mbid))
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete tracks whose files no longer exist on disk.
|
||||||
|
pub async fn delete_orphaned(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
|
let all = Tracks::find().all(db).await?;
|
||||||
|
let mut deleted = 0u64;
|
||||||
|
for t in all {
|
||||||
|
if !std::path::Path::new(&t.file_path).exists() {
|
||||||
|
Tracks::delete_by_id(t.id).exec(db).await?;
|
||||||
|
deleted += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(deleted)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get tracks by artist name (case-insensitive match).
|
/// Get tracks by artist name (case-insensitive match).
|
||||||
pub async fn get_by_artist_name(db: &DatabaseConnection, name: &str) -> DbResult<Vec<Track>> {
|
pub async fn get_by_artist_name(db: &DatabaseConnection, name: &str) -> DbResult<Vec<Track>> {
|
||||||
Ok(Tracks::find()
|
Ok(Tracks::find()
|
||||||
|
|||||||
@@ -69,11 +69,61 @@ pub async fn update_status(
|
|||||||
Ok(active.update(db).await?)
|
Ok(active.update(db).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn update_mbid(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
id: i32,
|
||||||
|
musicbrainz_id: &str,
|
||||||
|
) -> DbResult<WantedItem> {
|
||||||
|
let existing = get_by_id(db, id).await?;
|
||||||
|
let mut active: ActiveModel = existing.into();
|
||||||
|
active.musicbrainz_id = Set(Some(musicbrainz_id.to_string()));
|
||||||
|
active.updated_at = Set(Utc::now().naive_utc());
|
||||||
|
Ok(active.update(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_track_id(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
id: i32,
|
||||||
|
track_id: i32,
|
||||||
|
) -> DbResult<WantedItem> {
|
||||||
|
let existing = get_by_id(db, id).await?;
|
||||||
|
let mut active: ActiveModel = existing.into();
|
||||||
|
active.track_id = Set(Some(track_id));
|
||||||
|
active.updated_at = Set(Utc::now().naive_utc());
|
||||||
|
Ok(active.update(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn find_by_mbid(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
musicbrainz_id: &str,
|
||||||
|
) -> DbResult<Option<WantedItem>> {
|
||||||
|
Ok(WantedItems::find()
|
||||||
|
.filter(wanted_item::Column::MusicbrainzId.eq(musicbrainz_id))
|
||||||
|
.one(db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn remove(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
pub async fn remove(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
||||||
WantedItems::delete_by_id(id).exec(db).await?;
|
WantedItems::delete_by_id(id).exec(db).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn remove_by_artist(db: &DatabaseConnection, artist_id: i32) -> DbResult<u64> {
|
||||||
|
let result = WantedItems::delete_many()
|
||||||
|
.filter(wanted_item::Column::ArtistId.eq(artist_id))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn remove_by_mbid(db: &DatabaseConnection, musicbrainz_id: &str) -> DbResult<u64> {
|
||||||
|
let result = WantedItems::delete_many()
|
||||||
|
.filter(wanted_item::Column::MusicbrainzId.eq(musicbrainz_id))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
/// Promote all Downloaded items to Owned status. Returns the count updated.
|
/// Promote all Downloaded items to Owned status. Returns the count updated.
|
||||||
pub async fn promote_downloaded_to_owned(db: &DatabaseConnection) -> DbResult<u64> {
|
pub async fn promote_downloaded_to_owned(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
let now = Utc::now().naive_utc();
|
let now = Utc::now().naive_utc();
|
||||||
|
|||||||
@@ -0,0 +1,306 @@
|
|||||||
|
use chrono::Utc;
|
||||||
|
use sea_orm::sea_query::Expr;
|
||||||
|
use sea_orm::*;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use crate::entities::work_queue::{
|
||||||
|
self, ActiveModel, Entity as WorkQueue, Model as WorkItem, WorkQueueStatus, WorkTaskType,
|
||||||
|
};
|
||||||
|
use crate::error::{DbError, DbResult};
|
||||||
|
|
||||||
|
pub async fn enqueue(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
task_type: WorkTaskType,
|
||||||
|
payload_json: &str,
|
||||||
|
pipeline_id: Option<&str>,
|
||||||
|
) -> DbResult<WorkItem> {
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let active = ActiveModel {
|
||||||
|
task_type: Set(task_type),
|
||||||
|
status: Set(WorkQueueStatus::Pending),
|
||||||
|
payload_json: Set(payload_json.to_string()),
|
||||||
|
pipeline_id: Set(pipeline_id.map(String::from)),
|
||||||
|
created_at: Set(now),
|
||||||
|
started_at: Set(None),
|
||||||
|
completed_at: Set(None),
|
||||||
|
error: Set(None),
|
||||||
|
retry_count: Set(0),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
Ok(active.insert(db).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn enqueue_batch(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
items: Vec<(WorkTaskType, String, Option<String>)>,
|
||||||
|
) -> DbResult<Vec<WorkItem>> {
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let mut results = Vec::with_capacity(items.len());
|
||||||
|
for (task_type, payload_json, pipeline_id) in items {
|
||||||
|
let active = ActiveModel {
|
||||||
|
task_type: Set(task_type),
|
||||||
|
status: Set(WorkQueueStatus::Pending),
|
||||||
|
payload_json: Set(payload_json),
|
||||||
|
pipeline_id: Set(pipeline_id),
|
||||||
|
created_at: Set(now),
|
||||||
|
started_at: Set(None),
|
||||||
|
completed_at: Set(None),
|
||||||
|
error: Set(None),
|
||||||
|
retry_count: Set(0),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
results.push(active.insert(db).await?);
|
||||||
|
}
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomically claim up to `limit` pending items of the given type by setting
|
||||||
|
/// their status to Running. Returns the claimed items.
|
||||||
|
pub async fn claim_next(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
task_type: WorkTaskType,
|
||||||
|
limit: u64,
|
||||||
|
) -> DbResult<Vec<WorkItem>> {
|
||||||
|
let pending = WorkQueue::find()
|
||||||
|
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Pending))
|
||||||
|
.filter(work_queue::Column::TaskType.eq(task_type))
|
||||||
|
.order_by_asc(work_queue::Column::CreatedAt)
|
||||||
|
.limit(limit)
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let mut claimed = Vec::with_capacity(pending.len());
|
||||||
|
for item in pending {
|
||||||
|
let mut active: ActiveModel = item.into();
|
||||||
|
active.status = Set(WorkQueueStatus::Running);
|
||||||
|
active.started_at = Set(Some(now));
|
||||||
|
let updated = active.update(db).await?;
|
||||||
|
claimed.push(updated);
|
||||||
|
}
|
||||||
|
Ok(claimed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn complete(db: &DatabaseConnection, id: i32) -> DbResult<()> {
|
||||||
|
let item = WorkQueue::find_by_id(id)
|
||||||
|
.one(db)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?;
|
||||||
|
let mut active: ActiveModel = item.into();
|
||||||
|
active.status = Set(WorkQueueStatus::Completed);
|
||||||
|
active.completed_at = Set(Some(Utc::now().naive_utc()));
|
||||||
|
active.error = Set(None);
|
||||||
|
active.update(db).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fail(db: &DatabaseConnection, id: i32, error: &str) -> DbResult<()> {
|
||||||
|
let item = WorkQueue::find_by_id(id)
|
||||||
|
.one(db)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbError::NotFound(format!("work_queue id={id}")))?;
|
||||||
|
let retry_count = item.retry_count;
|
||||||
|
let mut active: ActiveModel = item.into();
|
||||||
|
|
||||||
|
if retry_count < 3 {
|
||||||
|
// Auto-retry: reset to pending with incremented count
|
||||||
|
active.status = Set(WorkQueueStatus::Pending);
|
||||||
|
active.started_at = Set(None);
|
||||||
|
active.retry_count = Set(retry_count + 1);
|
||||||
|
active.error = Set(Some(error.to_string()));
|
||||||
|
} else {
|
||||||
|
// Max retries exceeded: mark as permanently failed
|
||||||
|
active.status = Set(WorkQueueStatus::Failed);
|
||||||
|
active.completed_at = Set(Some(Utc::now().naive_utc()));
|
||||||
|
active.error = Set(Some(error.to_string()));
|
||||||
|
}
|
||||||
|
active.update(db).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reset any items stuck in Running state back to Pending (for startup recovery).
|
||||||
|
pub async fn reset_stale_running(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
|
let result = WorkQueue::update_many()
|
||||||
|
.col_expr(
|
||||||
|
work_queue::Column::Status,
|
||||||
|
Expr::value(WorkQueueStatus::Pending),
|
||||||
|
)
|
||||||
|
.col_expr(
|
||||||
|
work_queue::Column::StartedAt,
|
||||||
|
Expr::value(Option::<chrono::NaiveDateTime>::None),
|
||||||
|
)
|
||||||
|
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Running))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct TypeCounts {
|
||||||
|
pub pending: u64,
|
||||||
|
pub running: u64,
|
||||||
|
pub completed: u64,
|
||||||
|
pub failed: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get counts grouped by status for a specific task type.
|
||||||
|
pub async fn counts_for_type(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
task_type: WorkTaskType,
|
||||||
|
) -> DbResult<TypeCounts> {
|
||||||
|
let items = WorkQueue::find()
|
||||||
|
.filter(work_queue::Column::TaskType.eq(task_type))
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut counts = TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
};
|
||||||
|
for item in items {
|
||||||
|
match item.status {
|
||||||
|
WorkQueueStatus::Pending => counts.pending += 1,
|
||||||
|
WorkQueueStatus::Running => counts.running += 1,
|
||||||
|
WorkQueueStatus::Completed => counts.completed += 1,
|
||||||
|
WorkQueueStatus::Failed => counts.failed += 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(counts)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get counts for all task types at once.
|
||||||
|
pub async fn counts_all(db: &DatabaseConnection) -> DbResult<AllCounts> {
|
||||||
|
let items = WorkQueue::find().all(db).await?;
|
||||||
|
|
||||||
|
let mut result = AllCounts {
|
||||||
|
download: TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
},
|
||||||
|
index: TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
},
|
||||||
|
tag: TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
},
|
||||||
|
organize: TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
},
|
||||||
|
enrich: TypeCounts {
|
||||||
|
pending: 0,
|
||||||
|
running: 0,
|
||||||
|
completed: 0,
|
||||||
|
failed: 0,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
for item in items {
|
||||||
|
let counts = match item.task_type {
|
||||||
|
WorkTaskType::Download => &mut result.download,
|
||||||
|
WorkTaskType::Index => &mut result.index,
|
||||||
|
WorkTaskType::Tag => &mut result.tag,
|
||||||
|
WorkTaskType::Organize => &mut result.organize,
|
||||||
|
WorkTaskType::Enrich => &mut result.enrich,
|
||||||
|
};
|
||||||
|
match item.status {
|
||||||
|
WorkQueueStatus::Pending => counts.pending += 1,
|
||||||
|
WorkQueueStatus::Running => counts.running += 1,
|
||||||
|
WorkQueueStatus::Completed => counts.completed += 1,
|
||||||
|
WorkQueueStatus::Failed => counts.failed += 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct AllCounts {
|
||||||
|
pub download: TypeCounts,
|
||||||
|
pub index: TypeCounts,
|
||||||
|
pub tag: TypeCounts,
|
||||||
|
pub organize: TypeCounts,
|
||||||
|
pub enrich: TypeCounts,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if all items for a pipeline are completed (excluding a specific item ID,
|
||||||
|
/// typically the currently-running item that hasn't been marked complete yet).
|
||||||
|
pub async fn pipeline_is_complete(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
pipeline_id: &str,
|
||||||
|
exclude_id: Option<i32>,
|
||||||
|
) -> DbResult<bool> {
|
||||||
|
let mut query = WorkQueue::find()
|
||||||
|
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||||
|
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Completed))
|
||||||
|
.filter(work_queue::Column::Status.ne(WorkQueueStatus::Failed));
|
||||||
|
if let Some(id) = exclude_id {
|
||||||
|
query = query.filter(work_queue::Column::Id.ne(id));
|
||||||
|
}
|
||||||
|
let incomplete = query.count(db).await?;
|
||||||
|
Ok(incomplete == 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete completed items older than the given number of days.
|
||||||
|
pub async fn cleanup_completed(db: &DatabaseConnection, older_than_days: i64) -> DbResult<u64> {
|
||||||
|
let cutoff = (Utc::now() - chrono::Duration::days(older_than_days)).naive_utc();
|
||||||
|
let result = WorkQueue::delete_many()
|
||||||
|
.filter(work_queue::Column::Status.eq(WorkQueueStatus::Completed))
|
||||||
|
.filter(work_queue::Column::CompletedAt.lt(cutoff))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all completed and failed items for a specific pipeline.
|
||||||
|
pub async fn clear_pipeline(db: &DatabaseConnection, pipeline_id: &str) -> DbResult<u64> {
|
||||||
|
let result = WorkQueue::delete_many()
|
||||||
|
.filter(work_queue::Column::PipelineId.eq(pipeline_id))
|
||||||
|
.filter(
|
||||||
|
work_queue::Column::Status
|
||||||
|
.eq(WorkQueueStatus::Completed)
|
||||||
|
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
|
||||||
|
)
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all completed/failed items that belong to a pipeline (have a pipeline_id).
|
||||||
|
/// Does not affect standalone (non-pipeline) work items.
|
||||||
|
pub async fn clear_all_pipelines(db: &DatabaseConnection) -> DbResult<u64> {
|
||||||
|
let result = WorkQueue::delete_many()
|
||||||
|
.filter(work_queue::Column::PipelineId.is_not_null())
|
||||||
|
.filter(
|
||||||
|
work_queue::Column::Status
|
||||||
|
.eq(WorkQueueStatus::Completed)
|
||||||
|
.or(work_queue::Column::Status.eq(WorkQueueStatus::Failed)),
|
||||||
|
)
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
Ok(result.rows_affected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if there are any pending or running items.
|
||||||
|
pub async fn has_active_work(db: &DatabaseConnection) -> DbResult<bool> {
|
||||||
|
let count = WorkQueue::find()
|
||||||
|
.filter(
|
||||||
|
work_queue::Column::Status
|
||||||
|
.eq(WorkQueueStatus::Pending)
|
||||||
|
.or(work_queue::Column::Status.eq(WorkQueueStatus::Running)),
|
||||||
|
)
|
||||||
|
.count(db)
|
||||||
|
.await?;
|
||||||
|
Ok(count > 0)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user