Added the import/cleanup functionality

This commit is contained in:
Connor Johnstone
2026-03-24 15:58:14 -04:00
parent 1431cd2fbc
commit 823ef15022
11 changed files with 392 additions and 27 deletions

View File

@@ -42,10 +42,21 @@ pub async fn trigger_pipeline(state: &web::Data<AppState>) -> Result<String, Api
state.workers.notify(WorkTaskType::Download);
}
// Step 3: Scan library for existing files (import pipeline)
let index_payload = serde_json::json!({"scan_all": true});
queries::work_queue::enqueue(
conn,
WorkTaskType::Index,
&index_payload.to_string(),
Some(&pipeline_id),
)
.await?;
state.workers.notify(WorkTaskType::Index);
tracing::info!(
download_items = pending.len(),
pipeline_id = %pipeline_id,
"pipeline work items created"
"pipeline work items created (including library scan)"
);
Ok(pipeline_id)

View File

@@ -44,6 +44,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.route(web::get().to(list_albums))
.route(web::post().to(add_album)),
)
.service(web::resource("/albums/{mbid}/watch").route(web::delete().to(unwatch_album)))
.service(web::resource("/albums/{mbid}").route(web::get().to(get_album)));
}
@@ -194,3 +195,34 @@ async fn add_album(
"errors": summary.errors,
})))
}
async fn unwatch_album(
state: web::Data<AppState>,
session: Session,
path: web::Path<String>,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let mbid = path.into_inner();
let conn = state.db.conn();
// Get the album's tracks from MB to find their recording MBIDs
let tracks = match state.mb_client.get_release_tracks(&mbid).await {
Ok(t) => t,
Err(_) => {
// Try as release-group
let release_mbid = resolve_release_from_group(&state, &mbid).await?;
state
.mb_client
.get_release_tracks(&release_mbid)
.await
.map_err(|e| ApiError::Internal(format!("MusicBrainz error: {e}")))?
}
};
let mut removed = 0u64;
for track in &tracks {
removed += queries::wanted::remove_by_mbid(conn, &track.recording_mbid).await?;
}
Ok(HttpResponse::Ok().json(serde_json::json!({"removed": removed})))
}

View File

@@ -78,6 +78,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.route(web::post().to(set_monitored))
.route(web::delete().to(unset_monitored)),
)
.service(web::resource("/artists/{id}/watch").route(web::delete().to(unwatch_artist)))
.service(
web::resource("/artists/{id}")
.route(web::get().to(get_artist))
@@ -839,12 +840,31 @@ async fn delete_artist(
session: Session,
path: web::Path<i32>,
) -> Result<HttpResponse, ApiError> {
auth::require_admin(&session)?;
auth::require_auth(&session)?;
let id = path.into_inner();
queries::artists::delete(state.db.conn(), id).await?;
let conn = state.db.conn();
// Cascade: remove wanted items, tracks (DB only), albums, cache, then artist
queries::wanted::remove_by_artist(conn, id).await?;
queries::tracks::delete_by_artist(conn, id).await?;
queries::albums::delete_by_artist(conn, id).await?;
let _ = queries::cache::purge_prefix(conn, &format!("artist_totals:{id}")).await;
queries::artists::delete(conn, id).await?;
Ok(HttpResponse::NoContent().finish())
}
async fn unwatch_artist(
state: web::Data<AppState>,
session: Session,
path: web::Path<i32>,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let id = path.into_inner();
let removed = queries::wanted::remove_by_artist(state.db.conn(), id).await?;
Ok(HttpResponse::Ok().json(serde_json::json!({"removed": removed})))
}
async fn set_monitored(
state: web::Data<AppState>,
session: Session,

View File

@@ -29,9 +29,13 @@ pub struct WatchTrackRequest {
}
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("/tracks/watch").route(web::post().to(watch_track)))
.service(web::resource("/tracks").route(web::get().to(list_tracks)))
.service(web::resource("/tracks/{id}").route(web::get().to(get_track)));
cfg.service(
web::resource("/tracks/watch")
.route(web::post().to(watch_track))
.route(web::delete().to(unwatch_track)),
)
.service(web::resource("/tracks").route(web::get().to(list_tracks)))
.service(web::resource("/tracks/{id}").route(web::get().to(get_track)));
}
async fn list_tracks(
@@ -87,3 +91,17 @@ async fn watch_track(
"artist_name": entry.artist_name,
})))
}
async fn unwatch_track(
state: web::Data<AppState>,
session: Session,
body: web::Json<WatchTrackRequest>,
) -> Result<HttpResponse, ApiError> {
auth::require_auth(&session)?;
let mbid = body
.mbid
.as_deref()
.ok_or_else(|| ApiError::BadRequest("provide recording mbid".into()))?;
let removed = queries::wanted::remove_by_mbid(state.db.conn(), mbid).await?;
Ok(HttpResponse::Ok().json(serde_json::json!({"removed": removed})))
}

View File

@@ -68,13 +68,14 @@ impl WorkerManager {
queries::work_queue::cleanup_completed(cleanup_state.db.conn(), 7).await;
}
});
});
// Spawn each worker type
spawn_worker(state.clone(), WorkTaskType::Download, 1);
spawn_worker(state.clone(), WorkTaskType::Index, 4);
spawn_worker(state.clone(), WorkTaskType::Tag, 2);
spawn_worker(state.clone(), WorkTaskType::Organize, 4);
// Read config for concurrency settings and spawn workers
let cfg = state_clone.config.read().await.clone();
spawn_worker(state_clone.clone(), WorkTaskType::Download, 1);
spawn_worker(state_clone.clone(), WorkTaskType::Index, cfg.indexing.concurrency);
spawn_worker(state_clone.clone(), WorkTaskType::Tag, cfg.tagging.concurrency);
spawn_worker(state_clone.clone(), WorkTaskType::Organize, 4);
});
}
}
@@ -332,14 +333,43 @@ async fn process_index(
.await
.map_err(|e| e.to_string())?;
// Create Tag work items for all untagged tracks
let untagged = queries::tracks::get_needing_metadata(conn)
// Create Tag work items for tracks that still need processing:
// 1. Tracks without MBIDs (need MB search + tagging)
// 2. Tracks with MBIDs but no wanted_item yet (need wanted_item creation + organize)
let needs_processing = queries::tracks::get_needing_metadata(conn)
.await
.map_err(|e| e.to_string())?;
for track in &untagged {
for track in &needs_processing {
let tag_payload = serde_json::json!({"track_id": track.id});
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
}
// Also process tracks that have MBIDs (from file tags) but no wanted_item
let all_wanted = queries::wanted::list(conn, None, None)
.await
.map_err(|e| e.to_string())?;
let wanted_mbids: std::collections::HashSet<&str> = all_wanted
.iter()
.filter_map(|w| w.musicbrainz_id.as_deref())
.collect();
let mut offset = 0u64;
loop {
let tracks = queries::tracks::list(conn, 500, offset)
.await
.map_err(|e| e.to_string())?;
if tracks.is_empty() {
break;
}
for track in &tracks {
if let Some(ref mbid) = track.musicbrainz_id {
if !wanted_mbids.contains(mbid.as_str()) {
let tag_payload = serde_json::json!({"track_id": track.id});
downstream.push((WorkTaskType::Tag, tag_payload.to_string()));
}
}
}
offset += 500;
}
} else if let Some(file_path) = payload.get("file_path").and_then(|v| v.as_str()) {
// Single file index
let path = std::path::PathBuf::from(file_path);
@@ -384,6 +414,43 @@ async fn process_tag(
.await
.map_err(|e| e.to_string())?;
// Re-read the track to get the MBID set by tagging
let track = queries::tracks::get_by_id(conn, track_id)
.await
.map_err(|e| e.to_string())?;
// Ensure a wanted_item exists for this track (marks imported files as Owned)
if let Some(ref mbid) = track.musicbrainz_id {
if queries::wanted::find_by_mbid(conn, mbid)
.await
.map_err(|e| e.to_string())?
.is_none()
{
let item = queries::wanted::add(
conn,
queries::wanted::AddWantedItem {
item_type: shanty_db::entities::wanted_item::ItemType::Track,
name: track.title.as_deref().unwrap_or("Unknown"),
musicbrainz_id: Some(mbid),
artist_id: track.artist_id,
album_id: track.album_id,
track_id: Some(track.id),
user_id: None,
},
)
.await
.map_err(|e| e.to_string())?;
// Mark as Owned immediately since the file already exists
let _ = queries::wanted::update_status(
conn,
item.id,
shanty_db::entities::wanted_item::WantedStatus::Owned,
)
.await;
}
}
// Create Organize work item
let org_payload = serde_json::json!({"track_id": track_id});
Ok(vec![(WorkTaskType::Organize, org_payload.to_string())])
@@ -417,11 +484,29 @@ async fn process_organize(
// Promote this track's wanted item from Downloaded to Owned
let _ = queries::wanted::promote_downloaded_to_owned(conn).await;
// Check if pipeline is complete and trigger enrichment
// Check if pipeline is complete — run cleanup then enrichment
if let Some(ref pipeline_id) = item.pipeline_id
&& let Ok(true) = queries::work_queue::pipeline_is_complete(conn, pipeline_id).await
{
tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, triggering enrichment");
tracing::info!(pipeline_id = %pipeline_id, "pipeline complete, running cleanup");
// Cleanup: remove orphaned tracks, empty albums, unused artists
match queries::tracks::delete_orphaned(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up orphaned tracks"),
Err(e) => tracing::warn!(error = %e, "failed to clean orphaned tracks"),
_ => {}
}
match queries::albums::delete_empty(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up empty albums"),
Err(e) => tracing::warn!(error = %e, "failed to clean empty albums"),
_ => {}
}
match queries::artists::delete_unused(conn).await {
Ok(n) if n > 0 => tracing::info!(count = n, "cleaned up unused artists"),
Err(e) => tracing::warn!(error = %e, "failed to clean unused artists"),
_ => {}
}
let state = state.clone();
tokio::spawn(async move {
if let Err(e) = crate::routes::artists::enrich_all_watched_artists(&state).await {