跳转至

持久化改造计划 — Redis 多 Worker + 关系型数据库通用适配

基于 optimization-plan.md Phase 5/6 重新整理。 前置变更:LangGraph 已移除(Pi-style engine)、TaskManager 已拆分为 ExecutionRunner + RuntimeStore、StreamTransport Protocol 已抽象。 目标部署形态:双服务器 + 云托管关系型数据库(TDSQL/MySQL 或 PostgreSQL)+ 云托管 Redis。


与旧计划(optimization-plan.md P5/P6)的差异

旧计划项 当前状态 本计划处理
5.1 RETURNING 移除 ✅ 已完成(.returning() 已不存在) 仅保留 Alembic 部分
5.2 Redis Checkpointer 完全作废 — LangGraph 已移除,无 checkpointer 删除
5.3 StreamManager → Redis Streams Protocol 已抽象(StreamTransport → Phase 2
5.4 Manager 缓存决策 不变(request-local,不迁移 Redis) 沿用
5.5 TaskManager 多 Worker TaskManager 已不存在,拆为 ExecutionRunner + RuntimeStore → Phase 1(核心改造)
6.1-6.3 PostgreSQL 改为数据库通用适配(TDSQL/MySQL/PostgreSQL 均支持) → Phase 3

最大的变化:LangGraph 移除后,Redis 不再是"解决 checkpointer 串行瓶颈"的角色,而是纯粹为多 Worker 部署服务。这意味着 Redis 的引入可以和多 Worker 适配一步到位,不需要分两轮做。

能力边界说明

本方案提供的是控制面高可用(active-active shared control plane),不是执行中任务无损迁移。具体含义:

  • 能做到:任一台应用机挂了,新请求可以被另一台正常处理(lease/interrupt/stream 状态在 Redis 中共享)
  • 不能做到:某个 engine 正跑在挂掉的机器上时,这次执行中的状态无法迁移到另一台继续跑——engine 的 Python 运行时状态(LLM 对话上下文、工具调用栈、内存中的 agent state)不做分布式持久化

执行中任务丢失后的恢复路径:lease TTL 过期 → 用户重新发送消息 → 新执行在存活 Worker 上启动。对话历史和 artifact 已持久化在数据库中不受影响。

注:Phase 1 和 Phase 2 可以分开开发,但必须一起上线才能支撑真正的双机 active-active 部署。单独上 Phase 1 时 POST /chat 和 GET /stream 仍可能落在不同 Worker,SSE 事件不可达。


架构现状

全局单例 (app lifespan)                  请求级实例 (per HTTP request)
├─ DatabaseManager (SQLite, 连接池+WAL)   ├─ AsyncSession
├─ StreamTransport (InMemory Queue)       ├─ ConversationManager/Repo
├─ ExecutionRunner (asyncio.Task 调度)     ├─ ArtifactManager/Repo
│   └─ RuntimeStore (InMemory dict×5)     ├─ ExecutionController
├─ agents config (只读)                   └─ Repositories
└─ tools (只读)

单进程限制清单

组件 限制 跨进程影响
InMemoryRuntimeStore 5 个 dict,进程内可见 Worker A 的 lease/interrupt 对 Worker B 不可见
InterruptState.event asyncio.Event,进程内 await /resume 只能唤醒同进程的 engine
StreamManager asyncio.Queue,进程内消费 Worker A push 的事件 Worker B 的 SSE 读不到
ExecutionRunner._tasks dict[str, asyncio.Task],进程内 永远只能管理本 Worker 的任务
DatabaseManager (SQLite) 单写者,WAL 并发有限 多进程写入锁竞争(切 TDSQL 后解决)

Phase 1:Redis RuntimeStore — 多 Worker 一步到位

目标:RuntimeStore 从 InMemory 切到 Redis,解决 lease/interrupt/cancel/inject 的跨 Worker 可见性。同时解决 reviewer 指出的两个 P1 问题(sync→async Protocol、InterruptState 跨进程唤醒)。

1.1 Redis 基础设施

# docker-compose.yml
redis:
  image: redis:7-alpine          # 不再需要 Redis Stack(无 checkpointer)
  ports:
    - "6379:6379"
  volumes:
    - redis_data:/data
  command: >
    redis-server
    --appendonly yes
    --appendfsync everysec
    --save 900 1
    --maxmemory 256mb
    --maxmemory-policy noeviction
  healthcheck:
    test: ["CMD", "redis-cli", "ping"]
    interval: 10s
    timeout: 5s
    retries: 3

与旧计划的区别:用 redis:7-alpine 而非 redis/redis-stack。LangGraph checkpointer 需要 RedisJSON + RediSearch 模块,但我们不再需要 checkpointer,纯 Redis 足矣。镜像从 ~300MB 降到 ~30MB。

配置项(config.py): - REDIS_URL: str = ""(空 = 走 InMemory,非空 = 走 Redis) - LEASE_TTL: int = 90(秒,lease 初始 TTL,心跳每 TTL/3 续租)

1.2 RuntimeStore Protocol 升级为 async

Reviewer P1 要点:当前 Protocol 全 sync,Redis 客户端(redis.asyncio)需要 await

改动范围:

src/api/services/runtime_store.py — Protocol 方法全部加 async

@runtime_checkable
class RuntimeStore(Protocol):
    # ── Lease(owner = message_id,compare-and-del 需要) ──
    async def try_acquire_lease(self, conversation_id: str, message_id: str) -> Optional[str]: ...
    async def release_lease(self, conversation_id: str, message_id: str) -> None: ...
    async def get_leased_message_id(self, conversation_id: str) -> Optional[str]: ...

    # ── Interactive(同理需要 owner 做 compare-and-del)──
    async def mark_engine_interactive(self, conversation_id: str, message_id: str) -> None: ...
    async def clear_engine_interactive(self, conversation_id: str, message_id: str) -> None: ...
    async def get_interactive_message_id(self, conversation_id: str) -> Optional[str]: ...

    # ── Interrupt ──
    async def create_interrupt(self, message_id: str, data: Dict[str, Any]) -> None: ...
    async def resolve_interrupt(self, message_id: str, resume_data: Dict[str, Any]) -> Literal["resolved", "not_found", "already_resolved"]: ...
    async def get_interrupt_data(self, message_id: str) -> Optional[Dict[str, Any]]: ...
    async def wait_for_resume(self, message_id: str, timeout: float) -> Optional[Dict[str, Any]]: ...

    # ── Cancellation / Message queue ──
    async def request_cancel(self, message_id: str) -> None: ...
    async def is_cancelled(self, message_id: str) -> bool: ...
    async def inject_message(self, message_id: str, content: str) -> None: ...
    async def drain_messages(self, message_id: str) -> List[str]: ...

    # ── Lifecycle(renew 需要 conv_id + msg_id 定位 key + 校验 owner)──
    async def renew_lease(self, conversation_id: str, message_id: str, ttl: int) -> None: ...
    async def cleanup_execution(self, conversation_id: str, message_id: str) -> None: ...
    async def shutdown_cleanup(self) -> None: ...

Owner 语义说明release_leaseclear_engine_interactiverenew_leasecleanup_execution 都需要 (conversation_id, message_id) 对,原因: - Redis key 是 lease:{conv_id}(需要 conv_id 定位 key) - compare-and-del 需要 message_id 校验 owner(防止误删其他 Worker 的 lease) - InMemoryRuntimeStore 忽略 message_id 参数即可(单进程无竞争)

InMemoryRuntimeStore — 同步实现加 async 关键字(dict 操作本身不阻塞,加 async 只是满足协议)。

调用点适配(机械改动,所有 store.xxx()await store.xxx()): - src/api/routers/chat.py — send_message, inject, cancel, resume - src/api/routers/stream.py — resolve_interrupt - src/core/engine.py — EngineHooks 回调签名改 async - src/core/controller.pyon_engine_exit 签名改为 Callable[[str, str], Awaitable[None]],接收 (conversation_id, message_id) 双参数 - src/api/services/execution_runner.py — cleanup_execution, shutdown_cleanup

1.3 Interrupt 跨进程唤醒(解决 Reviewer P2)

问题核心:engine 通过 await interrupt.event.wait() 阻塞,/resume 通过 interrupt.event.set() 唤醒。这个 asyncio.Event 是进程内对象,跨 Worker 无法传递。

解决方案:将 interrupt 拆成两层——Redis 存储状态 + 本地 Event 桥接唤醒。

Engine                    Redis                      /resume (任意 Worker)
  │                         │                              │
  ├─ create_interrupt ──►   SET interrupt:{msg_id}         │
  │                         {data: ..., status: pending}   │
  │                         │                              │
  ├─ wait_for_resume ──►    SUBSCRIBE interrupt:{msg_id}   │
  │   (block on local       │                              │
  │    asyncio.Event)       │                              │
  │                         │        resolve_interrupt ◄───┤
  │                         │        SET status=resolved    │
  │                         │        PUBLISH interrupt:{msg_id}
  │                         │              │
  │   ◄── on_message ──────────────────────┘
  │   local_event.set()     │
  │                         │
  ├─ read resume_data ──►   GET interrupt:{msg_id}

Protocol 变化

旧接口:

def create_interrupt(self, message_id, data) -> InterruptState  # 返回 InterruptState
# engine 直接 await interrupt.event.wait()

新接口:

async def create_interrupt(self, message_id, data) -> None          # 只写入状态
async def wait_for_resume(self, message_id, timeout) -> Optional[Dict]  # 阻塞等待,返回 resume_data 或 None(超时)
async def resolve_interrupt(self, message_id, resume_data) -> ...   # 写入 resume_data + 通知
async def get_interrupt_data(self, message_id) -> Optional[Dict]    # 读取 interrupt_data(路由层用)

关键设计wait_for_resume 内部机制因实现而异: - InMemoryRuntimeStore:内部维护 asyncio.Eventwait_for_resume 就是 await event.wait() - RedisRuntimeStorecheck-subscribe-check-wait 四步模式(见下方)

Pub/Sub 丢通知防护

纯 Pub/Sub 有竞争窗口——如果 PUBLISH 发生在 SUBSCRIBE 建立之前,消息丢失,engine 白等到超时。RedisRuntimeStore.wait_for_resume 必须用双重检查覆盖这个窗口:

async def wait_for_resume(self, message_id, timeout):
    # ① CHECK:PUBLISH 可能已经发生,先查 Hash 状态
    if await self._is_resolved(message_id):
        return await self._get_resume_data(message_id)

    # ② SUBSCRIBE:建立订阅
    pubsub = self._redis.pubsub()
    await pubsub.subscribe(f"interrupt:{message_id}")

    # ③ CHECK:覆盖 ① 和 ② 之间的窗口
    #    如果 PUBLISH 恰好在 check 之后、subscribe 之前发生,
    #    消息丢了没关系,状态已写入 Hash,这里查到
    if await self._is_resolved(message_id):
        await pubsub.unsubscribe()
        return await self._get_resume_data(message_id)

    # ④ WAIT:订阅已生效且状态仍 pending,安全等待
    try:
        async with asyncio.timeout(timeout):
            async for msg in pubsub.listen():
                if msg["type"] == "message":
                    break
    except TimeoutError:
        return None  # 超时视为拒绝
    finally:
        await pubsub.unsubscribe()

    return await self._get_resume_data(message_id)

时序穷举:

/resume 发生时刻 ① check ② subscribe ③ check ④ wait 结果
在 ① 之前 ✅ 查到 resolved 直接返回
① 和 ② 之间 ❌ pending 建立 ✅ 查到 resolved 直接返回
② 和 ③ 之间 ❌ pending 已建立 ✅ 查到 resolved 直接返回
③ 之后 ❌ pending 已建立 ❌ pending ✅ 收到 PUBLISH 正常唤醒

每个时序至少有一个机制兜住,无遗漏窗口。

Engine 改动src/core/engine.py):

# 旧:
interrupt = hooks.create_interrupt(message_id, interrupt_data)
await asyncio.wait_for(interrupt.event.wait(), timeout=config.PERMISSION_TIMEOUT)
resume_data = interrupt.resume_data

# 新:
await hooks.create_interrupt(message_id, interrupt_data)
resume_data = await hooks.wait_for_resume(message_id, timeout=config.PERMISSION_TIMEOUT)
# resume_data is None → timeout (treated as deny)

EngineHooks 签名更新

@dataclass
class EngineHooks:
    check_cancelled: Callable[[str], Awaitable[bool]]
    create_interrupt: Callable[[str, Dict[str, Any]], Awaitable[None]]
    wait_for_resume: Callable[[str, float], Awaitable[Optional[Dict[str, Any]]]]
    drain_messages: Callable[[str], Awaitable[List[str]]]

InterruptState dataclass 可以从 engine.py 中移除(或降级为 InMemoryRuntimeStore 的内部实现细节)。

1.4 RedisRuntimeStore 实现

src/api/services/redis_runtime_store.py

class RedisRuntimeStore:
    def __init__(self, redis: redis.asyncio.Redis):
        self._redis = redis
        self._local_events: dict[str, asyncio.Event] = {}  # 本地 interrupt 桥接
        self._subscriber_tasks: dict[str, asyncio.Task] = {}

Redis Key 设计

Key 类型 TTL 用途
lease:{conv_id} STRING (msg_id) LEASE_TTL (90s),心跳续租 conversation lease
interactive:{conv_id} STRING (msg_id) LEASE_TTL (90s),心跳续租 engine interactive 标记
interrupt:{msg_id} HASH PERMISSION_TIMEOUT + 60 interrupt 状态
cancel:{msg_id} STRING "1" STREAM_TIMEOUT 取消标记
queue:{msg_id} LIST STREAM_TIMEOUT 消息注入队列

Lease / Interactive TTL 策略:不用静态 STREAM_TIMEOUT 做 TTL(任务可能比预期长),改为短 TTL + 心跳续租: - 初始 TTL = LEASE_TTL(建议 90s) - 执行中每 LEASE_TTL / 3(30s)续一次(EXPIRE 重置 TTL) - 正常执行:不管跑多久,心跳持续续租,lease 永不过期 - Worker 崩溃:心跳停止,最多等 LEASE_TTL(90s)自动释放 - 续租间隔 = TTL/3,给两次重试机会(某次 EXPIRE 因网络抖动失败,下次还来得及)

关键方法实现

  • try_acquire_leaseSET lease:{conv_id} {msg_id} NX EX {LEASE_TTL},原子操作天然防重,初始 TTL 由心跳续租维持
  • release_lease(conv_id, msg_id) → Lua compare-and-del:GET lease:{conv_id} == msg_id 才 DEL
  • is_cancelledEXISTS cancel:{msg_id}
  • inject_messageRPUSH queue:{msg_id} {content}
  • drain_messages → Lua 脚本:LRANGE + DEL 原子取出全部消息
  • create_interruptHSET interrupt:{msg_id} data {json} status pending
  • resolve_interrupt → Lua 脚本:检查 status=pending → 设 resume_data + status=resolved → PUBLISH interrupt:{msg_id}
  • wait_for_resume → check-subscribe-check-wait 四步模式(见 1.3),防 Pub/Sub 丢通知
  • cleanup_execution(conv_id, msg_id) → pipeline:compare-and-del lease/interactive + DEL cancel/queue/interrupt

Lua 脚本(原子操作):

-- compare-and-del(lease 释放、interactive 清除)
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end

-- drain-all(消息队列原子取出)
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
redis.call("DEL", KEYS[1])
return msgs

-- compare-and-expire(lease / interactive 心跳续租)
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
    return 0
end

-- resolve-interrupt(原子状态转换 + 发布通知)
local status = redis.call("HGET", KEYS[1], "status")
if status == false then return "not_found" end
if status ~= "pending" then return "already_resolved" end
redis.call("HSET", KEYS[1], "status", "resolved", "resume_data", ARGV[1])
redis.call("PUBLISH", KEYS[1], "resolved")
return "resolved"

1.5 ExecutionRunner 适配

ExecutionRunner 保持为本地调度器(每个 Worker 一个实例),不做分布式化:

  • _tasks: dict[str, asyncio.Task] — 永远只管本 Worker 的任务
  • _semaphore — per-Worker 并发上限
  • 分布式防重由 Redis lease 保证(try_acquire_leaseSET NX 天然跨 Worker 互斥)

不需要全局 Semaphore(旧计划 5.5.2 继续 defer,理由同旧计划:per-Worker 限流已足够)。

submit() 签名变更:新增 conversation_id 参数,用于心跳续租和 cleanup 时定位 Redis key + 校验 owner:

async def submit(self, task_id: str, conversation_id: str, coro: Coroutine) -> asyncio.Task:

Lease 心跳续租_wrapped() 启动后台心跳,定期续租 lease 和 interactive key:

async def _wrapped():
    heartbeat = asyncio.create_task(
        self._renew_loop(conversation_id, task_id, interval=config.LEASE_TTL // 3)
    )
    async with self._semaphore:
        try:
            await coro
        finally:
            heartbeat.cancel()
            await self.store.cleanup_execution(conversation_id, task_id)

async def _renew_loop(self, conversation_id, task_id, interval):
    """每 interval 秒续一次 lease + interactive"""
    while True:
        await asyncio.sleep(interval)
        await self.store.renew_lease(conversation_id, task_id, ttl=interval * 3)

renew_lease 实现: - InMemoryRuntimeStore:空操作(内存无 TTL 概念) - RedisRuntimeStore:Lua compare-and-expire — 校验 GET lease:{conv_id} == msg_id 后才 EXPIRE,防止续错别人的 lease。interactive 同理。

1.6 故障处理

时机 Redis 不可用的行为
启动时 init_globals()redis.ping() 失败 → 应用启动失败(fail fast)
请求时 try_acquire_leaseConnectionError → router 层返回 503
执行中 is_cancelled / drain_messages 失败 → engine 视为"无取消/无消息",继续执行,日志 warning
interrupt wait_for_resume 中 subscribe 断连 → 超时处理(视为 deny),与现有行为一致

1.7 涉及文件

文件 操作
docker-compose.yml — 新增 Redis 服务
src/config.py — 新增 REDIS_URL
src/api/services/runtime_store.py — Protocol async 化 + InMemory 适配
src/api/services/redis_runtime_store.py 新建 — Redis 实现
src/core/engine.py — InterruptState 解耦、EngineHooks async 化、wait_for_resume
src/core/controller.py on_engine_exit: Callable[[str, str], Awaitable[None]]stream_execute()(conv_id, msg_id) 调用,确保 compare-and-del
src/api/routers/chat.py — await store 调用
src/api/routers/stream.py — await store 调用
src/api/services/execution_runner.py — await store 调用
src/api/dependencies.py — Redis 连接初始化、注入 RedisRuntimeStore
tests/test_runtime_store.py — async 适配
tests/test_execution_runner.py — async 适配
tests/test_engine_execution.py — hooks async 适配
tests/integration/test_redis_runtime_store.py 新建 — Redis 集成测试

1.8 退出标准

  • [ ] 单 Worker 部署行为与改造前完全一致(InMemoryRuntimeStore 仍可用于开发/测试)
  • [ ] 同一 conversation 并发 POST /chat 到不同 Worker → 第二个被拒绝(409)
  • [ ] Worker A 启动的执行,Worker B 可以 inject/cancel/resume
  • [ ] Permission interrupt 跨 Worker:Worker A 运行 engine → Worker B 收到 /resume → engine 唤醒
  • [ ] Worker 崩溃后 lease TTL 过期 → 新请求可正常提交
  • [ ] Redis 不可用 → 启动 fail fast / 请求 503 / 执行中 graceful degrade
  • [ ] 现有回归测试全部通过

Phase 2:Redis StreamTransport — 跨 Worker 事件推送

目标:StreamTransport 从 InMemory Queue 切到 Redis Streams,解决 POST /chat 和 GET /stream 可能落在不同 Worker 的问题。

2.1 为什么选 Redis Streams

与旧计划 5.3 相同,此处不重复。核心理由:Pub/Sub 断线丢消息,Streams 持久化 + 断点重放。

2.2 RedisStreamTransport 实现

src/api/services/redis_stream_transport.py

class RedisStreamTransport:
    """基于 Redis Streams 的 StreamTransport 实现"""

    def __init__(self, redis: redis.asyncio.Redis, ttl_seconds: int = 30):
        self._redis = redis
        self.ttl_seconds = ttl_seconds

Redis Key 设计

Key 类型 TTL 用途
stream:{msg_id} STREAM STREAM_TTL 事件流
stream_meta:{msg_id} HASH STREAM_TTL stream 元数据

方法映射

StreamTransport 方法 Redis 操作
create_stream HSET stream_meta:{id} owner {uid} status pending + EXPIRE
push_event XADD stream:{id} MAXLEN ~1000 * type {t} data {json}
consume_events XREAD BLOCK {heartbeat_interval} STREAMS stream:{id} {last_id}
close_stream HSET stream_meta:{id} status closed
get_stream_status HGET stream_meta:{id} status

consume_events 内部逻辑: 1. 校验 owner(HGET stream_meta:{id} owner) 2. 取消 TTL(前端已连接) 3. 循环 XREAD BLOCK,超时时 yield __ping__ 4. 终结事件后 EXPIRE(延迟清理)

2.3 SSE 断线重连

与旧计划 5.3.3 相同:

  • consume_events 新增 last_event_id 参数
  • SSE 响应附带 id: 字段(Redis Stream entry ID)
  • src/api/routers/stream.py 读取 Last-Event-ID header
  • 前端 lib/sse.ts 手动维护 last_event_id(非 EventSource,无自动重连)

2.4 涉及文件

文件 操作
src/api/services/redis_stream_transport.py 新建
src/api/services/stream_transport.py — consume_events 增加 last_event_id 参数
src/api/services/stream_manager.py — 适配新参数(InMemory 实现忽略 last_event_id)
src/api/routers/stream.py — 读取 Last-Event-ID + SSE id 字段
src/api/routers/chat.py — _run_and_push 适配(push_event 的 entry ID 透传)
src/api/dependencies.py — 注入 RedisStreamTransport
src/api/utils/sse.py — SSE event 增加 id 字段
frontend/src/lib/sse.ts — 维护 last_event_id,断线重连携带
tests/integration/test_redis_stream_transport.py 新建

2.5 退出标准

  • [ ] Worker A push 事件 → Worker B 的 SSE 连接可消费
  • [ ] 断线重连:消费中断 → 用 last_event_id 重连 → 不丢消息
  • [ ] TTL 过期后 Stream key 自动删除
  • [ ] 所有权隔离:非 owner 消费被拒绝(StreamNotFoundError
  • [ ] Redis 故障:SSE 连接建立时 503 / 流中断时 SSE error 事件
  • [ ] 现有回归测试全部通过

Phase 3:去 SQLite + 关系型数据库通用适配

去掉 SQLite 生产代码,支持 TDSQL (MySQL) 和 PostgreSQL 双兼容。用户通过 DATABASE_URL 和可选依赖选择目标数据库。

3.0 当前代码的 SQLite / 方言依赖审计

代码并非"ORM 就完事了"——以下是实际查出的兼容问题:

问题 位置 影响 处理
手写 SQL 迁移 001_initial_schema.pyAUTOINCREMENTINSERT OR IGNORECREATE INDEX IF NOT EXISTS MySQL 语法不同 删除,换 Alembic
PRAGMA 硬编码 database.py — WAL、journal_mode、busy_timeout 等 5 条 PRAGMA MySQL 无 PRAGMA 移除 SQLite 分支
_is_sqlite() 分支 database.py L86-88, L107-120 — engine 配置、StaticPool、WAL MySQL 不需要 移除
默认 URL config.pysqlite+aiosqlite:///data/artifactflow.db 需改 MySQL 改默认值
datetime.now 无时区 models.py 全部时间字段 default=datetime.now SQLite 存文本无所谓,MySQL DATETIME 与服务器时区交互 改为 func.now() 由数据库控制
.returning() 已移除 ✅ 无需处理
Repository 原生 SQL 无 ✅ — 全部通过 ORM 构造查询 无需处理
JSON 类型 models.py — messages.metadata, artifacts.metadata_ MySQL 5.7+ 原生支持 JSON 无需处理(但注意 TDSQL 版本需 ≥5.7)
Text 列排序 conversation_repo ORDER BY updated_at(非 Text 列) 未发现对 Text 列排序 ✅ 无需处理
String PK users/conversations/messages/artifacts 主键均为 String(64) MySQL InnoDB 字符串 PK 比整型慢(B+Tree 比较成本),但功能可用 不改(改动量过大,当前规模无性能问题)
测试内存库 conftest — sqlite+aiosqlite:///:memory: 单元测试继续用 保留(见测试策略)

3.1 Alembic 迁移框架

当前状态:手写 SQL 迁移(001_initial_schema.py,含 SQLite 方言)。

改动: - alembic initenv.py 配置 async engine(aiomysql 驱动) - 从 ORM models autogenerate 初始迁移 — Alembic 的 DDL API 天然生成 MySQL 兼容语法 - 迁移执行策略:部署前单次执行(alembic upgrade head),不在应用启动时自动运行(多 Worker 并发迁移竞态) - DatabaseManager.initialize() 改为 schema version 校验:启动时检查版本,不匹配 fail fast - 删除 001_initial_schema.py(含 AUTOINCREMENTINSERT OR IGNORE 等 SQLite 方言)

3.2 DatabaseManager 简化 + 引擎切换

# database.py — 移除全部 SQLite 分支,零方言判断
class DatabaseManager:
    def __init__(self, database_url: str, echo: bool = False):
        self.database_url = database_url
        ...

    async def initialize(self):
        self._engine = create_async_engine(
            self.database_url,
            pool_size=config.DB_POOL_SIZE,         # 默认 10
            max_overflow=config.DB_MAX_OVERFLOW,    # 默认 20
            pool_timeout=config.DB_POOL_TIMEOUT,    # 默认 30
            pool_recycle=config.DB_POOL_RECYCLE,    # 默认 1800
            pool_pre_ping=True,
            echo=self.echo,
        )
        # 无 PRAGMA、无 WAL、无 _is_sqlite()

移除清单: - _is_sqlite() 方法 - _configure_sqlite_wal() 方法 - check_same_thread connect_args - StaticPool 导入和内存库分支(测试库保留,见 3.6) - 默认 URL 从 sqlite+aiosqlite:/// 改为空(必须显式配置)

config.py 新增:

DATABASE_URL: str = ""  # 必须配置,无默认值
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20
DB_POOL_TIMEOUT: int = 30
DB_POOL_RECYCLE: int = 1800

驱动由用户按需安装,DatabaseManager 只认 URL 前缀,不关心底层驱动:

# TDSQL/MySQL 部署
pip install -e ".[mysql]"     # → aiomysql
DATABASE_URL=mysql+aiomysql://user:pass@host:3306/artifactflow

# PostgreSQL 部署
pip install -e ".[postgres]"  # → asyncpg
DATABASE_URL=postgresql+asyncpg://user:pass@host:5432/artifactflow

3.3 ORM 模型适配

datetime.nowfunc.now()

# 旧:
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)

# 新:
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())

理由:SQLite 存 datetime 为文本,datetime.now 用 Python 端时间没问题。MySQL 的 DATETIME 列受服务器时区影响,Python 端和 DB 端时间可能不一致。用 func.now() 让数据库控制时间,保证一致性。

部署提醒:确认云库时区配置(TDSQL 默认 Asia/Shanghai),所有 Worker 连同一个库即保证时间一致。

其他类型无需改动JSON → MySQL JSON / PostgreSQL JSONBTextTEXTString(N)VARCHAR(N)Integer + autoincrement=True → MySQL AUTO_INCREMENT / PostgreSQL SERIAL。SQLAlchemy 根据 URL 前缀自动选择正确映射,代码层零方言判断。

3.4 依赖策略

# pyproject.toml
[project]
dependencies = [
    "sqlalchemy>=2.0",
    "aiosqlite",        # 默认装 — 测试用 SQLite 内存库,pip install 即可跑全部单元测试
    # ...其他依赖
]

[project.optional-dependencies]
mysql = ["aiomysql"]      # pip install -e ".[mysql]"
postgres = ["asyncpg"]    # pip install -e ".[postgres]"

设计原则: - aiosqlite 是默认依赖 — git clone + pip install -e . + pytest 零配置跑通全部单元测试 - 生产数据库驱动按需安装 — 用户部署时选择目标数据库,只装对应驱动 - DatabaseManager 只认 URL 前缀,SQLAlchemy 自动 import 对应驱动,代码零方言判断

3.5 Docker 开发环境

提供两套 compose profile,开发者按需选择:

# docker-compose.dev.yml — MySQL profile(TDSQL 部署场景)
mysql:
  image: mysql:8.0
  environment:
    MYSQL_DATABASE: artifactflow
    MYSQL_USER: artifactflow
    MYSQL_PASSWORD: ${MYSQL_PASSWORD:-changeme}
    MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpass}
  ports:
    - "3306:3306"
  volumes:
    - mysql_data:/var/lib/mysql
  command: >
    --character-set-server=utf8mb4
    --collation-server=utf8mb4_unicode_ci
    --default-time-zone='+00:00'
  healthcheck:
    test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
    interval: 10s
    timeout: 5s
    retries: 5
# docker-compose.dev.yml — PostgreSQL profile(PostgreSQL 部署场景)
postgres:
  image: postgres:16-alpine
  environment:
    POSTGRES_DB: artifactflow
    POSTGRES_USER: artifactflow
    POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-changeme}
  ports:
    - "5432:5432"
  volumes:
    - postgres_data:/var/lib/postgresql/data
  healthcheck:
    test: ["CMD-SHELL", "pg_isready -U artifactflow"]
    interval: 10s
    timeout: 5s
    retries: 5

MySQL 关键配置: - utf8mb4 — 支持 emoji 和完整 Unicode(agent 输出可能包含) - default-time-zone='+00:00' — 服务端 UTC,配合 func.now() 使用 - 生产环境用云托管 TDSQL/RDS,此配置仅本地开发用

3.5 复合索引

# conversations 表 — list_conversations() WHERE user_id=? ORDER BY updated_at DESC
Index("ix_conversations_user_updated", "user_id", "updated_at")

# messages 表 — 对话内消息加载 WHERE conversation_id=? ORDER BY created_at
Index("ix_messages_conv_created", "conversation_id", "created_at")

3.7 测试策略

数据库 理由
单元测试(Repository / Manager) SQLite 内存库(aiosqlite,默认依赖) 快(~3s 全跑完),pip install -e . 零配置即可运行,CI 无需额外服务
集成测试(API 端到端) MySQL 或 PostgreSQL(docker-compose.test.yml 验证真实方言行为(时区、字符集、连接池)
手动测试 MySQL 或 PostgreSQL(docker-compose.dev.yml 与生产一致

tests/conftest.py 保留 sqlite+aiosqlite:///:memory:(需保留 StaticPool 导入和内存库分支,仅在测试工具函数中,不在生产 DatabaseManager 中)。做法:create_test_database_manager() 独立函数,不走生产代码路径。

3.8 涉及文件

文件 操作
src/db/database.py — 移除 SQLite 分支,连接池参数外部化
src/config.py — DATABASE_URL 改空默认 + 池参数
src/db/models.py datetime.nowfunc.now()
src/db/migrations/versions/001_initial_schema.py 删除
src/db/migrations/ — Alembic 初始化 + autogenerate
pyproject.toml aiosqlite 默认依赖,aiomysql / asyncpg 可选依赖
docker-compose.dev.yml 新建 — MySQL + PostgreSQL 双 profile
tests/conftest.py create_test_database_manager() 独立于生产路径
tests/integration/ 新建 — 数据库集成测试

3.9 退出标准

  • [ ] MySQL 上全部回归测试通过
  • [ ] PostgreSQL 上全部回归测试通过
  • [ ] 并发写入测试通过(InnoDB / PostgreSQL MVCC)
  • [ ] 生产 DatabaseManager 中零 SQLite 代码(aiosqlite 仅作为测试依赖被引用)
  • [ ] datetime 时区一致性验证(DB 端 UTC → Python 端正确解析)
  • [ ] utf8mb4 / UTF-8 字符集验证(emoji 内容正常存取)
  • [ ] /health 端点检查数据库 + Redis 连通性
  • [ ] pip install -e . + pytest tests/(不含 integration)零配置通过

贯穿策略

Manager 缓存决策(不变)

与旧计划 5.4 一致:ConversationManager._cacheArtifactManager._cache 保持 request-local 内存缓存,不迁移 Redis。

测试策略

单元测试:继续用 InMemoryRuntimeStore + StreamManager(内存实现)+ SQLite 内存库(aiosqlite 默认依赖)。pip install -e . + pytest tests/ 零配置、零容器、几秒跑完。

集成测试tests/integration/):

文件 覆盖
test_redis_runtime_store.py lease 跨 Worker 互斥、interrupt pub/sub 唤醒、TTL 过期清理
test_redis_stream_transport.py 跨进程 push/consume、断线重连、TTL
test_redis_fault.py 连接中断 503、恢复后自动重连

CI 基础设施docker-compose.test.yml 包含 Redis + MySQL (或 PostgreSQL) service container。

开发者体验

Phase 3 完成后砍掉生产 DatabaseManager 中的 SQLite 代码,aiosqlite 保留为默认依赖供测试使用。

本地开发三种模式:

模式 适用场景 需要容器
pip install -e . + pytest 日常开发,跑单元测试 不需要
docker-compose.dev.yml + 本地启动后端 功能开发,连真实数据库 MySQL/PG + Redis
pytest tests/integration/ 集成测试 MySQL/PG + Redis

环境变量 REDIS_URL 不设 → fallback 到 InMemoryRuntimeStore(便于快速原型,不推荐生产)。


实施节奏

三轮提交,每轮独立可验证:

第一轮:P3 — 数据库层(改动面最隔离,先落地)  ✅ DONE
  3.1 Alembic 替换手写迁移
  3.2 DatabaseManager 去 SQLite 分支
  3.3 datetime.now → func.now()
  3.4 依赖策略 (aiosqlite 默认 + extras)
  → 全量单元测试通过 → commit
  (数据库层就绪,上层完全不动)

第二轮:P1.2 — Protocol async 化(纯机械改动,接口就绪)  ✅ DONE
  RuntimeStore Protocol 全部 async
  InMemoryRuntimeStore 加 async 关键字
  所有调用点加 await
  EngineHooks 签名改 Awaitable
  on_engine_exit 改 async + (conv_id, msg_id) 双参数
  submit() 新增 conversation_id 参数
  StreamManager → InMemoryStreamTransport,合入 stream_transport.py
  create_interrupt + wait_for_resume 合并为 wait_for_interrupt
  InterruptState 降级为 InMemory 内部实现细节
  → 全量单元测试通过 → commit
  (仍是 InMemory,行为不变,但接口已就绪)

第三轮:P1.3-1.4 + P2 — Redis 实现(一起写一起上线)  ✅ DONE
  1.1 Redis 基础设施 (docker-compose + config)
  1.3 RedisRuntimeStore (含 check-subscribe-check-wait、心跳续租、Lua 脚本)
  1.5 ExecutionRunner 心跳任务
  1.6 故障处理
  2.2 RedisStreamTransport (含断线重连)
  2.3 SSE Last-Event-ID 支持
  dependencies.py REDIS_URL 分支
  → 单元测试 + 集成测试通过 → commit
  (Phase 1 + Phase 2 必须一起上线,见能力边界说明)
  补丁:stream 生命周期修复(consumer 断连不关 stream、producer 侧 close、
  consumer_id CAS 防竞态、孤儿 key 修复)、lease 竞态 Lua 原子化、
  前端 SSE 自动重连(Last-Event-ID + 指数退避 + ownership guard)、
  active-stream 查询 API

为什么这个顺序

  • P3 先行:改的是数据层,与运行时状态无关。改完跑一遍全量测试就能确认没 break。如果和 P1/P2 叠加,数据库驱动 + Protocol async 同时出问题很难定位。
  • P1.2 单独一轮:把"接口变更"和"实现切换"隔离。接口变更的 bug 能通过现有单元测试(InMemory 实现)立刻发现,不需要 Redis 容器。
  • P1.3+P2 合并:Redis RuntimeStore 和 Redis StreamTransport 共用 Redis 连接,且必须一起上线才能支撑双机部署(单上 RuntimeStore 时 SSE 仍然跨 Worker 不可达)。

验证矩阵

场景 单 Worker (InMemory) 多 Worker (Redis)
POST /chat 并发保护 dict lease ✅ SET NX lease ✅
inject/cancel 路由 dict 查询 ✅ GET 查询 ✅
permission interrupt asyncio.Event ✅ pub/sub + local Event ✅
SSE 事件投递 asyncio.Queue ✅ Redis Streams ✅
断线重连 ❌ 无 last_event_id ✅
Worker 崩溃恢复 N/A TTL 自动释放 ✅
DB 并发写入 SQLite WAL (受限) MySQL InnoDB / PostgreSQL MVCC ✅