Replace ad-hoc scheduling with a concurrent work queue system #43

Open
opened 2026-03-21 14:18:40 -04:00 by connor · 0 comments
Owner

Current state

The current scheduling uses three separate tokio::spawn + sleep loops (in shanty-web/src/pipeline_scheduler.rs, shanty-web/src/monitor.rs, and shanty-web/src/cookie_refresh.rs). Each is a copy-paste pattern that reads config, sleeps, and runs work.

The pipeline (shanty-web/src/pipeline.rs / shanty-web/src/routes/system.rs) runs 6 strictly sequential steps: sync → download → index → tag → organize → enrich. Each step must fully complete before the next begins. This means if 50 tracks are downloading, the tagger waits idle for all 50 to finish before it can tag even the first one.

Current problems

  • No concurrency — steps are fully sequential even when they could overlap. While track A is downloading, track B (already downloaded) could be getting tagged.
  • No persistence — scheduled state (next-run times) is in-memory only, lost on restart. The SchedulerInfo struct in shanty-web/src/state.rs tracks next_pipeline and next_monitor as Option<NaiveDateTime> but these are volatile.
  • No historical visibility — no record of when a task last ran or what the result was (only the current "next run" is tracked via SchedulerInfo).
  • Skip logic uses one-shot flags (skip_pipeline, skip_monitor in SchedulerInfo) that could race with the scheduler loop.
  • Each scheduler is a separate copy-paste patternpipeline_scheduler.rs, monitor.rs, and cookie_refresh.rs all duplicate the same loop structure.
  • TaskManager (shanty-web/src/tasks.rs) is an in-memory HashMap of TaskInfo structs that accumulates indefinitely — no cleanup, no persistence.

Proposed architecture: Work queue with typed workers

Replace the linear pipeline with a task bucket (work queue) with typed workers:

  1. Task types: Download, Index, Tag, Organize — each is a fine-grained unit of work (one task per file/track, not one task per batch).
  2. Workers: Each task type has a worker (or small worker pool with configurable concurrency). Workers pull tasks from the queue, process them, and create downstream tasks:
    • Sync creates individual Download tasks (one per wanted item)
    • Download worker finishes a file → creates an Index task for that file
    • Index worker finishes a track → creates a Tag task for that track
    • Tag worker finishes → creates an Organize task
    • Organize worker finishes → track is done
  3. Concurrency: While track A is being tagged, track B can still be downloading. The pipeline flows per-item, not per-batch. This is a major UX improvement — users see tracks appearing in their library progressively instead of waiting for the entire batch.
  4. Persistence: Task queue stored in a DB table (new work_queue table with id, task_type, status, payload_json, created_at, started_at, completed_at, error). In-progress tasks can be resumed after restart.
  5. Unified scheduler: A single Scheduler struct replaces the three separate scheduler files. It manages all recurring jobs with:
    • Persistent last-run timestamps in DB (new scheduler_state table or similar)
    • Configurable intervals (already in SchedulingConfig)
    • Cron-like expressions (optional, for future flexibility)
    • Dashboard visibility: last run time, next run time, last result

Key files to modify

  • shanty-web/src/pipeline.rs — replace with work queue dispatch logic
  • shanty-web/src/pipeline_scheduler.rs — merge into unified scheduler
  • shanty-web/src/monitor.rs — merge scheduler portion into unified scheduler
  • shanty-web/src/cookie_refresh.rs — merge into unified scheduler
  • shanty-web/src/tasks.rs — replace in-memory TaskManager with DB-backed work queue
  • shanty-web/src/state.rs — simplify SchedulerInfo, remove skip flags
  • shanty-db/src/migration/ — new migration for work_queue and scheduler_state tables
  • shanty-web/src/routes/system.rs — update trigger_pipeline() to enqueue work items instead of running sequentially
  • shanty-web/frontend/src/pages/dashboard.rs — update Background Tasks display to show work queue state

Acceptance criteria:

  • Pipeline steps run concurrently where possible (download + tag overlap)
  • Each work item (file/track) flows through the pipeline independently
  • Work queue persists across restarts — in-progress items resumed
  • Scheduling is centralized in one module with persistent last-run timestamps
  • Dashboard shows scheduler state: last run + next run + status for each recurring job
  • Dashboard shows work queue state: pending/active/completed counts per type
  • Adding a new scheduled job or worker type is straightforward
### Current state The current scheduling uses three separate `tokio::spawn` + `sleep` loops (in `shanty-web/src/pipeline_scheduler.rs`, `shanty-web/src/monitor.rs`, and `shanty-web/src/cookie_refresh.rs`). Each is a copy-paste pattern that reads config, sleeps, and runs work. The pipeline (`shanty-web/src/pipeline.rs` / `shanty-web/src/routes/system.rs`) runs 6 **strictly sequential** steps: sync → download → index → tag → organize → enrich. Each step must fully complete before the next begins. This means if 50 tracks are downloading, the tagger waits idle for all 50 to finish before it can tag even the first one. ### Current problems - **No concurrency** — steps are fully sequential even when they could overlap. While track A is downloading, track B (already downloaded) could be getting tagged. - **No persistence** — scheduled state (next-run times) is in-memory only, lost on restart. The `SchedulerInfo` struct in `shanty-web/src/state.rs` tracks `next_pipeline` and `next_monitor` as `Option<NaiveDateTime>` but these are volatile. - **No historical visibility** — no record of when a task last ran or what the result was (only the current "next run" is tracked via `SchedulerInfo`). - **Skip logic uses one-shot flags** (`skip_pipeline`, `skip_monitor` in `SchedulerInfo`) that could race with the scheduler loop. - **Each scheduler is a separate copy-paste pattern** — `pipeline_scheduler.rs`, `monitor.rs`, and `cookie_refresh.rs` all duplicate the same loop structure. - **TaskManager** (`shanty-web/src/tasks.rs`) is an in-memory HashMap of `TaskInfo` structs that accumulates indefinitely — no cleanup, no persistence. ### Proposed architecture: Work queue with typed workers Replace the linear pipeline with a **task bucket** (work queue) with typed workers: 1. **Task types:** `Download`, `Index`, `Tag`, `Organize` — each is a fine-grained unit of work (one task per file/track, not one task per batch). 2. **Workers:** Each task type has a worker (or small worker pool with configurable concurrency). Workers pull tasks from the queue, process them, and create downstream tasks: - `Sync` creates individual `Download` tasks (one per wanted item) - `Download` worker finishes a file → creates an `Index` task for that file - `Index` worker finishes a track → creates a `Tag` task for that track - `Tag` worker finishes → creates an `Organize` task - `Organize` worker finishes → track is done 3. **Concurrency:** While track A is being tagged, track B can still be downloading. The pipeline flows per-item, not per-batch. This is a major UX improvement — users see tracks appearing in their library progressively instead of waiting for the entire batch. 4. **Persistence:** Task queue stored in a DB table (new `work_queue` table with `id`, `task_type`, `status`, `payload_json`, `created_at`, `started_at`, `completed_at`, `error`). In-progress tasks can be resumed after restart. 5. **Unified scheduler:** A single `Scheduler` struct replaces the three separate scheduler files. It manages all recurring jobs with: - Persistent last-run timestamps in DB (new `scheduler_state` table or similar) - Configurable intervals (already in `SchedulingConfig`) - Cron-like expressions (optional, for future flexibility) - Dashboard visibility: last run time, next run time, last result ### Key files to modify - **`shanty-web/src/pipeline.rs`** — replace with work queue dispatch logic - **`shanty-web/src/pipeline_scheduler.rs`** — merge into unified scheduler - **`shanty-web/src/monitor.rs`** — merge scheduler portion into unified scheduler - **`shanty-web/src/cookie_refresh.rs`** — merge into unified scheduler - **`shanty-web/src/tasks.rs`** — replace in-memory TaskManager with DB-backed work queue - **`shanty-web/src/state.rs`** — simplify SchedulerInfo, remove skip flags - **`shanty-db/src/migration/`** — new migration for `work_queue` and `scheduler_state` tables - **`shanty-web/src/routes/system.rs`** — update `trigger_pipeline()` to enqueue work items instead of running sequentially - **`shanty-web/frontend/src/pages/dashboard.rs`** — update Background Tasks display to show work queue state **Acceptance criteria:** - [ ] Pipeline steps run concurrently where possible (download + tag overlap) - [ ] Each work item (file/track) flows through the pipeline independently - [ ] Work queue persists across restarts — in-progress items resumed - [ ] Scheduling is centralized in one module with persistent last-run timestamps - [ ] Dashboard shows scheduler state: last run + next run + status for each recurring job - [ ] Dashboard shows work queue state: pending/active/completed counts per type - [ ] Adding a new scheduled job or worker type is straightforward
connor added the MediumPriority label 2026-03-21 14:19:11 -04:00
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: Shanty/Main#43