本文记录 Tokimo(Web Desktop OS)任务调度系统的架构设计——从硬编码 match 分发到 JobHandler 注册表 + BusProxyHandler 代理的可插拔架构,涵盖 job 生命周期管理、sidecar 委托执行、协作取消机制及实时事件推送。


背景:为什么需要任务调度系统

Tokimo 作为桌面操作系统,需要处理大量后台耗时操作:媒体文件扫描、AI 推理(OCR / CLIP / 人脸识别)、元数据刮削、在线媒体下载、S3 上传等。这些操作的共同特征:

  • 耗时长:单次扫描可能处理数万张照片
  • 需要进度反馈:用户需要实时看到处理状态
  • 可重试:网络抖动或临时错误应自动重试
  • 可取消:用户应能中断正在运行的任务
  • 跨进程:部分任务由独立 sidecar app 执行

最初的实现是硬编码的——每种 job type 对应一个 match 分支,所有逻辑耦合在主进程中。随着 app 数量增长(Photo、Music、Video、Book、Dashcam),这种模式不可持续。


架构概览

┌─────────────────────────────────────────────────────────────────────┐
│                         Job Worker Loop                              │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  poll pending → group dedupe → claim → dispatch → complete   │   │
│  └──────────────────────────────────────────────────────────────┘   │
└────────────────────────────────┬────────────────────────────────────┘
                                 │
                                 ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      JobHandlerRegistry                              │
│          DashMap<(Option<app_id>, job_type) → Arc<dyn JobHandler>>   │
│                                                                      │
│  ┌─────────────────┬─────────────────┬─────────────────────────────┐│
│  │ (None,          │ (None,          │ (Some("video"),             ││
│  │  "photo_ocr")   │  "file_scrape") │  "tv_scrape")               ││
│  │  → FnHandler    │  → FnHandler    │  → BusProxyHandler           ││
│  │  (in-process)   │  (in-process)   │  (bus → sidecar)             ││
│  └─────────────────┴─────────────────┴─────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘
                                 │
                ┌────────────────┼────────────────┐
                ▼                ▼                ▼
       ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐
       │ Photo handler│  │ File handler │  │ Video sidecar    │
       │ (in-process) │  │ (in-process) │  │ via bus broker   │
       └──────────────┘  └──────────────┘  └──────────────────┘

核心组件:

组件 职责
Job Worker Loop 轮询 pending jobs,按优先级调度,管理并发
JobHandlerRegistry (app_id, job_type) → handler 的动态注册表
JobHandler trait 统一的 job 执行接口
BusProxyHandler 将 job 委托给 sidecar 通过 bus 执行
AppEvent channel job 状态变更 → WebSocket 实时推送

JobHandler Trait:统一执行接口

所有 job handler 的抽象接口,定义在 packages/rust-server/src/jobs/registry.rs

#[async_trait]
pub trait JobHandler: Send + Sync + 'static {
    async fn run(
        &self,
        db: &DatabaseConnection,
        state: &Arc<AppState>,
        job_id: Uuid,
        params: &JsonValue,
        user_id: Option<Uuid>,
        cancel: &JobCancel,
    ) -> Result<Option<JsonValue>, Box<dyn std::error::Error + Send + Sync>>;
}

返回值约定:

返回值 含义
Ok(Some(json)) 成功,json 写入 jobs.data(运行时数据)
Ok(None) 成功,无额外数据
Ok(Some({"_phase": "waiting"})) Parent job 进入 waiting 状态,等 children 完成
Err(e) 失败,写入 jobs.error,触发重试逻辑

两种 Handler 实现

FnHandler — 用于 in-process handler,通过宏生成:

impl_handler_fn!(photo_ocr::handle)
// 展开为:
// struct FnHandler;
// impl JobHandler for FnHandler { ... photo_ocr::handle(...) ... }
// Arc::new(FnHandler)

BusProxyHandler — 用于 sidecar 委托执行:

pub struct BusProxyHandler {
    bus_service: String,  // bus 服务名(如 "video")
    method: String,       // 调用的方法名(如 "dispatch_tv_scrape")
}

#[async_trait]
impl JobHandler for BusProxyHandler {
    async fn run(&self, ...) -> Result<...> {
        let bus = state.bus.get().ok_or("bus not initialized")?;
        let body = json!({ "job": { "id": job_id, "params": params } });
        bus.broker.call(&self.bus_service, &self.method, body.to_bytes(), caller).await
    }
}

注册时:

registry.register(
    Some("video"),
    "tv_scrape",
    Arc::new(BusProxyHandler::new("video", "dispatch_tv_scrape")),
);

主进程的 dispatch 逻辑变成一行:

if let Some(handler) = state.job_handlers.get(app_id, job_type) {
    return handler.run(db, state, job_id, params, user_id, &cancel).await;
}

Job Worker Loop:调度器

Job Worker 是一个 tokio task,运行在主进程中,负责从数据库轮询 pending jobs 并调度执行:

┌──────────────────────────────────────────────────────────────┐
│                     Job Worker Loop                            │
│                                                               │
│  ┌─ tokio::select! ────────────────────────────────────────┐  │
│  │  interval.tick()  (3s 周期)                              │  │
│  │  notify.notified() (job 完成后立即唤醒)                   │  │
│  └──────────────────────────────────────────────────────────┘  │
│       │                                                        │
│       ▼                                                        │
│  ┌─ 全局暂停检查 ───────────────────────────────────────────┐  │
│  │  SuspensionRepo::is_global_suspended → skip tick         │  │
│  └──────────────────────────────────────────────────────────┘  │
│       │                                                        │
│       ▼                                                        │
│  ┌─ find_pending_jobs(50) ──────────────────────────────────┐  │
│  │  ORDER BY priority DESC, created_at ASC                  │  │
│  └──────────────────────────────────────────────────────────┘  │
│       │                                                        │
│       ▼                                                        │
│  ┌─ 组去重 ─────────────────────────────────────────────────┐  │
│  │  coalesce(alias_job_id, id) 作为 group key               │  │
│  │  每组只 dispatch 第一个(winner),其余 bind 为 alias      │  │
│  └──────────────────────────────────────────────────────────┘  │
│       │                                                        │
│       ▼                                                        │
│  ┌─ 并发控制 ───────────────────────────────────────────────┐  │
│  │  Semaphore(max_concurrent)                                │  │
│  │  reserved high-priority slots                             │  │
│  │  每类型 suspension 检查                                   │  │
│  └──────────────────────────────────────────────────────────┘  │
│       │                                                        │
│       ▼                                                        │
│  tokio::spawn { process_job() }  ← 每个 job 独立 task          │
└──────────────────────────────────────────────────────────────┘

关键设计点:

机制 说明
双唤醒 3s 轮询 + Notify 即时唤醒(job 完成后立即调度下一个)
组去重 alias_job_id 指向同一个 target job,同一组只 dispatch 一个
优先级 ORDER BY priority DESC,高优任务优先调度
保留名额 高优任务始终有 reserved slots 可用
类型级暂停 SuspensionRepo::is_suspended(db, &type) 按类型暂停
协作取消 JobCancel token + check_cancel() 定期检查

优先级与去重

优先级枚举

pub enum JobPriority {
    Background = 0,      // 后台批量(扫描)
    Normal = 100,        // 普通任务(默认)
    UserAction = 1000,   // 用户交互触发
    Urgent = 5000,       // 系统紧急(保留)
}

调度器通过 JOB_RESERVED_HIGH_PRIORITY_SLOTS(默认 1)确保高优任务始终有可用名额:

running < max - reserved  →  普通任务可调度
running < max             →  高优任务可调度

Alias 去重

同一业务目标(如"刷新某张照片的 OCR")可能被多次触发。通过 dedupe_key 实现去重:

1. enqueue 时查询:是否存在 active target (app_id, task_type, dedupe_key)?
2. 存在 → 新 job 作为 alias 写入,alias_job_id = target.id
3. 不存在 → 新 job 作为 leader 写入

Alias 不占 worker slot,状态由 target 反向同步。调度器的组去重逻辑确保同一组只 dispatch leader。


协作取消机制

Job 可以被用户取消、被更高优先级任务抢占、或被全局暂停。取消通过 JobCancel token 实现协作式取消:

// handler 内部
for item in items {
    check_cancel(cancel)?;  // 如果被取消,返回 Err(sentinel_marker)
    process(item).await?;
}

三种取消原因:

原因 sentinel marker 终态
用户取消 CANCEL_MARKER_ABORTED cancelled
优先级抢占 CANCEL_MARKER_PREEMPTED cancelled
全局暂停 CANCEL_MARKER_SUSPENDED suspended

竞态保护: process_job 在 handler 返回后检查 cancel_reason,防止已取消的 job 被覆盖为 completed


Bus 服务层

jobs bus service

主进程通过 bus local service 暴露 11 个 RPC 方法给 sidecar app:

┌──────────────────────────────────────────────────────────────┐
│                   jobs bus service                             │
├──────────────────────┬───────────┬────────────────────────────┤
│ method               │ auth      │ 说明                       │
├──────────────────────┼───────────┼────────────────────────────┤
│ create               │ true      │ 创建单个 job               │
│ query                │ true      │ 按 id 或 filter 查询       │
│ cancel               │ true      │ 取消 job(可级联 children)│
│ update_status        │ true      │ 更新状态/结果              │
│ update_progress      │ true      │ 更新进度                   │
│ batch_children       │ true      │ 批量创建子 job             │
│ cancel_by_filter     │ true      │ 按 filter 批量取消         │
│ progress_summary     │ true      │ 聚合进度摘要               │
│ cleanup              │ true      │ 清理已完成 job             │
│ preempt              │ true      │ 抢占旧的 scan job          │
│ register_handler     │ false     │ 注册 job type → bus 映射   │
└──────────────────────┴───────────┴────────────────────────────┘

appId 归属契约: app_id 是 job 的顶层身份字段,从 CallerCtx.caller_app_id 自动获取,调用方不可伪造。paramsdata JSON 字段禁止内嵌 appId

task_queue bus service

Sidecar 通过 task_queue 推送外部 job 进度,主进程通过 WebSocket external_job_update 事件实时转发给前端:

Sidecar app
  │
  ├─ task_queue.upsert_job({ job_id, app_id, title, status, progress, ... })
  │     → 创建/更新 ExternalJobSnapshot(内存 DashMap)
  │     → emit AppEvent::ExternalJobUpdate
  │
  ├─ task_queue.update_progress({ job_id, progress, eta_seconds })
  │     → 更新 snapshot
  │     → emit AppEvent::ExternalJobUpdate
  │
  └─ task_queue.complete_job({ job_id, status })
        → 标记终态
        → emit AppEvent::ExternalJobUpdate

实时事件推送:AppEvent 通道

Job 状态变更通过 broadcast::Sender<AppEvent> 推送到 WebSocket 层:

pub enum AppEvent {
    // Job 状态变更(OS 概念)
    #[serde(rename = "job_update")]
    JobUpdate { job: Box<JobOutput> },

    // 外部 job 进度(sidecar 推送)
    #[serde(rename = "external_job_update")]
    ExternalJobUpdate { user_id: Uuid, job: Box<ExternalJobSnapshot> },

    // 通用 app 实体事件(OS 不解析 payload)
    #[serde(rename = "app_entity")]
    AppEntityEvent { user_id: Uuid, app_id: String, kind: String,
                     scope: Option<String>, payload: JsonValue },

    // Shell 应用列表变更
    #[serde(rename = "shell:apps_changed")]
    ShellAppsChanged { event: Box<AppRegistryEvent> },
}

设计原则: OS 层只做透传。JobUpdateExternalJobUpdate 是真正的 OS 概念;AppEntityEvent 是通用通道,OS 不感知 payload 内部结构,由 app 自己定义 kind 命名空间。


字段语义:params + data

Jobs 表有两个 JSON 字段,语义边界清晰:

字段 含义 可变性 示例
params 任务入参 不可变(创建后不变) { "libraryId": "abc", "sourceId": "tmdb" }
data 运行时数据 可变(进度、中间结果) { "total": 1000, "processed": 500 }

历史问题: 早期命名是 payload + meta,两个词语义边界模糊,多次出现"payload 里到底该放什么"的歧义。重命名为 params + data 后,语义一目了然:params = 输入参数,data = 运行态数据。


前端 SDK:事件订阅

三层 Hook 架构

useJobEvents (L1: 底层传输)
  │  订阅原始 WS 事件
  │  过滤 jobTypes / appId
  │
  ├── useSyncProgress (L3: 业务层,Host apps)
  │   ├── PhotoApp / MusicApp / BookApp 使用
  │   ├── 理解 library / sync 概念
  │   └── HTTP 轮询 + WS 事件混合
  │
  └── useJobProgress (L2: 通用进度,SDK)
      ├── VideoApp / third-party apps 使用
      ├── 无业务概念
      └── 纯 WS 事件驱动

设计原则: SDK 层不包含任何业务命名(librarysynccontent refresh)。业务概念只存在于各 app 自己的 wrapper 中。

useJobEvents

SDK 层的底层 hook,订阅 WebSocket job_update 事件:

useJobEvents({
  jobTypes: ["tv_scrape", "movie_scrape"],  // 过滤白名单
  appId: "video",                           // 可选:按 app 过滤
  onEvent: (event) => { /* ... */ },
  enabled: true,
});

useJobProgress

基于 useJobEvents 的通用进度追踪 hook:

const progress = useJobProgress({
  jobTypes: ["movie_scrape", "tv_scrape"],
  resolveEntityId: (params) => params.sourceId,
  onSettled: () => {
    queryClient.invalidateQueries({ queryKey: ["video", "libraries"] });
  },
});

// progress[sourceId] = { isActive, total, completed, running, pending, failed, pct }

实施历程

本次重构经历了多个阶段,跨越多个 Claude / Copilot session:

Phase 1: appId 列 + Bus Service
  ├─ jobs 表加 app_id 列
  ├─ 9 个 bus methods 注册
  └─ CallerCtx 自动获取 app_id

Phase 2: JobHandler 注册表
  ├─ JobHandler trait + JobHandlerRegistry
  ├─ BusProxyHandler(sidecar 委托)
  ├─ impl_handler_fn! 宏
  └─ 替换硬编码 match 分发

Phase 3: Sidecar 迁移
  ├─ Video app job_repo 从 1612 行精简到 110 行
  ├─ 所有 job 操作走 bus client
  └─ JobFilter 通用过滤体系

Phase 4: 字段语义化
  ├─ payload → params(不可变输入)
  ├─ meta → data(运行时数据)
  └─ 4 层命名对齐(Prisma / Entity / Repo / Frontend)

Phase 5: 事件通道清理
  ├─ AppEntityEvent 通用通道(OS 不解析 payload)
  ├─ 业务事件从 OS 层迁出
  └─ SDK useJobEvents 加 jobTypes 过滤

设计权衡

为何选择注册表而非 trait object + 依赖注入?

Rust 的 async trait 已稳定,但 handler 需要访问 AppState(包含 bus、DB、cancel registry 等全局状态)。将 state 作为 run() 参数传入,而非在构造时注入,避免了生命周期和 Arc 循环引用问题。

为何 BusProxyHandler 而非让 sidecar 直接操作 DB?

Sidecar 是独立进程,不应直接访问主进程的 PostgreSQL。通过 bus RPC 委托执行,sidecar 只需要知道 bus service 的地址。BusProxyHandler 封装了 bus 调用细节,主进程的 dispatch 逻辑无需区分 in-process 和 remote handler。

为何用 DashMap 而非 RwLock

JobHandlerRegistry 在 server 启动时写入,运行时只读。DashMap 的读操作无锁(分段锁),适合高并发读场景。写操作(handler 注册)极少发生,锁竞争可忽略。

为何 task_queue 用内存 DashMap 而非持久化?

ExternalJobSnapshot 是"尽力而为"的进度推送——sidecar 推上来就转发给前端,主进程重启后丢失可接受。持久化留给 v2。


总结

┌──────────────────────────────────────────────────────────────┐
│                     重构前 → 重构后                           │
├──────────────────────┬───────────────────────────────────────┤
│ 分发机制             │ 硬编码 match → JobHandlerRegistry     │
│ Sidecar 支持         │ 无 → BusProxyHandler 代理             │
│ Job CRUD             │ 直接 DB → bus service(11 个方法)    │
│ App 隔离             │ 无 → (app_id, job_type) 命名空间      │
│ 取消机制             │ 无 → 协作式 JobCancel token           │
│ 事件推送             │ 轮询 → WS 实时推送(3 种事件类型)    │
│ 字段语义             │ payload/meta 混乱 → params/data 清晰  │
│ 前端 SDK             │ 业务耦合 → 三层 hook 架构             │
│ Video app job_repo   │ 1612 行 → 110 行(bus client 封装)   │
└──────────────────────┴───────────────────────────────────────┘

关键架构决策:

  • 可插拔注册表JobHandlerRegistry 基于 DashMap,支持运行时动态注册,解耦了调度器与具体 handler
  • BusProxyHandler 代理:sidecar 通过 bus RPC 执行 job,无需直接访问 DB,进程边界清晰
  • 协作取消JobCancel token + sentinel marker,handler 在长操作中定期检查,优雅处理取消/抢占/暂停
  • params + data 语义分离:不可变输入与可变运行态数据严格区分,消除字段混用
  • 三层 hook 架构:SDK 层不含业务命名,各 app 自带业务 wrapper,OS 与 app 边界清晰