持久化改造计划 — Redis 多 Worker + 关系型数据库通用适配
基于 optimization-plan.md Phase 5/6 重新整理。 前置变更:LangGraph 已移除(Pi-style engine)、TaskManager 已拆分为 ExecutionRunner + RuntimeStore、StreamTransport Protocol 已抽象。 目标部署形态:双服务器 + 云托管关系型数据库(TDSQL/MySQL 或 PostgreSQL)+ 云托管 Redis。
与旧计划(optimization-plan.md P5/P6)的差异
| 旧计划项 | 当前状态 | 本计划处理 |
|---|---|---|
| 5.1 RETURNING 移除 | ✅ 已完成(.returning() 已不存在) |
仅保留 Alembic 部分 |
| 5.2 Redis Checkpointer | ❌ 完全作废 — LangGraph 已移除,无 checkpointer | 删除 |
| 5.3 StreamManager → Redis Streams | Protocol 已抽象(StreamTransport) |
→ Phase 2 |
| 5.4 Manager 缓存决策 | 不变(request-local,不迁移 Redis) | 沿用 |
| 5.5 TaskManager 多 Worker | TaskManager 已不存在,拆为 ExecutionRunner + RuntimeStore | → Phase 1(核心改造) |
| 6.1-6.3 PostgreSQL | 改为数据库通用适配(TDSQL/MySQL/PostgreSQL 均支持) | → Phase 3 |
最大的变化:LangGraph 移除后,Redis 不再是"解决 checkpointer 串行瓶颈"的角色,而是纯粹为多 Worker 部署服务。这意味着 Redis 的引入可以和多 Worker 适配一步到位,不需要分两轮做。
能力边界说明
本方案提供的是控制面高可用(active-active shared control plane),不是执行中任务无损迁移。具体含义:
- 能做到:任一台应用机挂了,新请求可以被另一台正常处理(lease/interrupt/stream 状态在 Redis 中共享)
- 不能做到:某个 engine 正跑在挂掉的机器上时,这次执行中的状态无法迁移到另一台继续跑——engine 的 Python 运行时状态(LLM 对话上下文、工具调用栈、内存中的 agent state)不做分布式持久化
执行中任务丢失后的恢复路径:lease TTL 过期 → 用户重新发送消息 → 新执行在存活 Worker 上启动。对话历史和 artifact 已持久化在数据库中不受影响。
注:Phase 1 和 Phase 2 可以分开开发,但必须一起上线才能支撑真正的双机 active-active 部署。单独上 Phase 1 时 POST /chat 和 GET /stream 仍可能落在不同 Worker,SSE 事件不可达。
架构现状
全局单例 (app lifespan) 请求级实例 (per HTTP request)
├─ DatabaseManager (SQLite, 连接池+WAL) ├─ AsyncSession
├─ StreamTransport (InMemory Queue) ├─ ConversationManager/Repo
├─ ExecutionRunner (asyncio.Task 调度) ├─ ArtifactManager/Repo
│ └─ RuntimeStore (InMemory dict×5) ├─ ExecutionController
├─ agents config (只读) └─ Repositories
└─ tools (只读)
单进程限制清单:
| 组件 | 限制 | 跨进程影响 |
|---|---|---|
InMemoryRuntimeStore |
5 个 dict,进程内可见 | Worker A 的 lease/interrupt 对 Worker B 不可见 |
InterruptState.event |
asyncio.Event,进程内 await |
/resume 只能唤醒同进程的 engine |
StreamManager |
asyncio.Queue,进程内消费 |
Worker A push 的事件 Worker B 的 SSE 读不到 |
ExecutionRunner._tasks |
dict[str, asyncio.Task],进程内 |
永远只能管理本 Worker 的任务 |
DatabaseManager (SQLite) |
单写者,WAL 并发有限 | 多进程写入锁竞争(切 TDSQL 后解决) |
Phase 1:Redis RuntimeStore — 多 Worker 一步到位
目标:
RuntimeStore从 InMemory 切到 Redis,解决 lease/interrupt/cancel/inject 的跨 Worker 可见性。同时解决 reviewer 指出的两个 P1 问题(sync→async Protocol、InterruptState 跨进程唤醒)。
1.1 Redis 基础设施
# docker-compose.yml
redis:
image: redis:7-alpine # 不再需要 Redis Stack(无 checkpointer)
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: >
redis-server
--appendonly yes
--appendfsync everysec
--save 900 1
--maxmemory 256mb
--maxmemory-policy noeviction
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
与旧计划的区别:用 redis:7-alpine 而非 redis/redis-stack。LangGraph checkpointer 需要 RedisJSON + RediSearch 模块,但我们不再需要 checkpointer,纯 Redis 足矣。镜像从 ~300MB 降到 ~30MB。
配置项(config.py):
- REDIS_URL: str = ""(空 = 走 InMemory,非空 = 走 Redis)
- LEASE_TTL: int = 90(秒,lease 初始 TTL,心跳每 TTL/3 续租)
1.2 RuntimeStore Protocol 升级为 async
Reviewer P1 要点:当前 Protocol 全 sync,Redis 客户端(redis.asyncio)需要 await。
改动范围:
src/api/services/runtime_store.py — Protocol 方法全部加 async:
@runtime_checkable
class RuntimeStore(Protocol):
# ── Lease(owner = message_id,compare-and-del 需要) ──
async def try_acquire_lease(self, conversation_id: str, message_id: str) -> Optional[str]: ...
async def release_lease(self, conversation_id: str, message_id: str) -> None: ...
async def get_leased_message_id(self, conversation_id: str) -> Optional[str]: ...
# ── Interactive(同理需要 owner 做 compare-and-del)──
async def mark_engine_interactive(self, conversation_id: str, message_id: str) -> None: ...
async def clear_engine_interactive(self, conversation_id: str, message_id: str) -> None: ...
async def get_interactive_message_id(self, conversation_id: str) -> Optional[str]: ...
# ── Interrupt ──
async def create_interrupt(self, message_id: str, data: Dict[str, Any]) -> None: ...
async def resolve_interrupt(self, message_id: str, resume_data: Dict[str, Any]) -> Literal["resolved", "not_found", "already_resolved"]: ...
async def get_interrupt_data(self, message_id: str) -> Optional[Dict[str, Any]]: ...
async def wait_for_resume(self, message_id: str, timeout: float) -> Optional[Dict[str, Any]]: ...
# ── Cancellation / Message queue ──
async def request_cancel(self, message_id: str) -> None: ...
async def is_cancelled(self, message_id: str) -> bool: ...
async def inject_message(self, message_id: str, content: str) -> None: ...
async def drain_messages(self, message_id: str) -> List[str]: ...
# ── Lifecycle(renew 需要 conv_id + msg_id 定位 key + 校验 owner)──
async def renew_lease(self, conversation_id: str, message_id: str, ttl: int) -> None: ...
async def cleanup_execution(self, conversation_id: str, message_id: str) -> None: ...
async def shutdown_cleanup(self) -> None: ...
Owner 语义说明:release_lease、clear_engine_interactive、renew_lease、cleanup_execution 都需要 (conversation_id, message_id) 对,原因:
- Redis key 是 lease:{conv_id}(需要 conv_id 定位 key)
- compare-and-del 需要 message_id 校验 owner(防止误删其他 Worker 的 lease)
- InMemoryRuntimeStore 忽略 message_id 参数即可(单进程无竞争)
InMemoryRuntimeStore — 同步实现加 async 关键字(dict 操作本身不阻塞,加 async 只是满足协议)。
调用点适配(机械改动,所有 store.xxx() → await store.xxx()):
- src/api/routers/chat.py — send_message, inject, cancel, resume
- src/api/routers/stream.py — resolve_interrupt
- src/core/engine.py — EngineHooks 回调签名改 async
- src/core/controller.py — on_engine_exit 签名改为 Callable[[str, str], Awaitable[None]],接收 (conversation_id, message_id) 双参数
- src/api/services/execution_runner.py — cleanup_execution, shutdown_cleanup
1.3 Interrupt 跨进程唤醒(解决 Reviewer P2)
问题核心:engine 通过 await interrupt.event.wait() 阻塞,/resume 通过 interrupt.event.set() 唤醒。这个 asyncio.Event 是进程内对象,跨 Worker 无法传递。
解决方案:将 interrupt 拆成两层——Redis 存储状态 + 本地 Event 桥接唤醒。
Engine Redis /resume (任意 Worker)
│ │ │
├─ create_interrupt ──► SET interrupt:{msg_id} │
│ {data: ..., status: pending} │
│ │ │
├─ wait_for_resume ──► SUBSCRIBE interrupt:{msg_id} │
│ (block on local │ │
│ asyncio.Event) │ │
│ │ resolve_interrupt ◄───┤
│ │ SET status=resolved │
│ │ PUBLISH interrupt:{msg_id}
│ │ │
│ ◄── on_message ──────────────────────┘
│ local_event.set() │
│ │
├─ read resume_data ──► GET interrupt:{msg_id}
Protocol 变化:
旧接口:
def create_interrupt(self, message_id, data) -> InterruptState # 返回 InterruptState
# engine 直接 await interrupt.event.wait()
新接口:
async def create_interrupt(self, message_id, data) -> None # 只写入状态
async def wait_for_resume(self, message_id, timeout) -> Optional[Dict] # 阻塞等待,返回 resume_data 或 None(超时)
async def resolve_interrupt(self, message_id, resume_data) -> ... # 写入 resume_data + 通知
async def get_interrupt_data(self, message_id) -> Optional[Dict] # 读取 interrupt_data(路由层用)
关键设计:wait_for_resume 内部机制因实现而异:
- InMemoryRuntimeStore:内部维护 asyncio.Event,wait_for_resume 就是 await event.wait()
- RedisRuntimeStore:check-subscribe-check-wait 四步模式(见下方)
Pub/Sub 丢通知防护:
纯 Pub/Sub 有竞争窗口——如果 PUBLISH 发生在 SUBSCRIBE 建立之前,消息丢失,engine 白等到超时。RedisRuntimeStore.wait_for_resume 必须用双重检查覆盖这个窗口:
async def wait_for_resume(self, message_id, timeout):
# ① CHECK:PUBLISH 可能已经发生,先查 Hash 状态
if await self._is_resolved(message_id):
return await self._get_resume_data(message_id)
# ② SUBSCRIBE:建立订阅
pubsub = self._redis.pubsub()
await pubsub.subscribe(f"interrupt:{message_id}")
# ③ CHECK:覆盖 ① 和 ② 之间的窗口
# 如果 PUBLISH 恰好在 check 之后、subscribe 之前发生,
# 消息丢了没关系,状态已写入 Hash,这里查到
if await self._is_resolved(message_id):
await pubsub.unsubscribe()
return await self._get_resume_data(message_id)
# ④ WAIT:订阅已生效且状态仍 pending,安全等待
try:
async with asyncio.timeout(timeout):
async for msg in pubsub.listen():
if msg["type"] == "message":
break
except TimeoutError:
return None # 超时视为拒绝
finally:
await pubsub.unsubscribe()
return await self._get_resume_data(message_id)
时序穷举:
| /resume 发生时刻 | ① check | ② subscribe | ③ check | ④ wait | 结果 |
|---|---|---|---|---|---|
| 在 ① 之前 | ✅ 查到 resolved | — | — | — | 直接返回 |
| ① 和 ② 之间 | ❌ pending | 建立 | ✅ 查到 resolved | — | 直接返回 |
| ② 和 ③ 之间 | ❌ pending | 已建立 | ✅ 查到 resolved | — | 直接返回 |
| ③ 之后 | ❌ pending | 已建立 | ❌ pending | ✅ 收到 PUBLISH | 正常唤醒 |
每个时序至少有一个机制兜住,无遗漏窗口。
Engine 改动(src/core/engine.py):
# 旧:
interrupt = hooks.create_interrupt(message_id, interrupt_data)
await asyncio.wait_for(interrupt.event.wait(), timeout=config.PERMISSION_TIMEOUT)
resume_data = interrupt.resume_data
# 新:
await hooks.create_interrupt(message_id, interrupt_data)
resume_data = await hooks.wait_for_resume(message_id, timeout=config.PERMISSION_TIMEOUT)
# resume_data is None → timeout (treated as deny)
EngineHooks 签名更新:
@dataclass
class EngineHooks:
check_cancelled: Callable[[str], Awaitable[bool]]
create_interrupt: Callable[[str, Dict[str, Any]], Awaitable[None]]
wait_for_resume: Callable[[str, float], Awaitable[Optional[Dict[str, Any]]]]
drain_messages: Callable[[str], Awaitable[List[str]]]
InterruptState dataclass 可以从 engine.py 中移除(或降级为 InMemoryRuntimeStore 的内部实现细节)。
1.4 RedisRuntimeStore 实现
src/api/services/redis_runtime_store.py:
class RedisRuntimeStore:
def __init__(self, redis: redis.asyncio.Redis):
self._redis = redis
self._local_events: dict[str, asyncio.Event] = {} # 本地 interrupt 桥接
self._subscriber_tasks: dict[str, asyncio.Task] = {}
Redis Key 设计:
| Key | 类型 | TTL | 用途 |
|---|---|---|---|
lease:{conv_id} |
STRING (msg_id) | LEASE_TTL (90s),心跳续租 |
conversation lease |
interactive:{conv_id} |
STRING (msg_id) | LEASE_TTL (90s),心跳续租 |
engine interactive 标记 |
interrupt:{msg_id} |
HASH | PERMISSION_TIMEOUT + 60 |
interrupt 状态 |
cancel:{msg_id} |
STRING "1" | STREAM_TIMEOUT |
取消标记 |
queue:{msg_id} |
LIST | STREAM_TIMEOUT |
消息注入队列 |
Lease / Interactive TTL 策略:不用静态 STREAM_TIMEOUT 做 TTL(任务可能比预期长),改为短 TTL + 心跳续租:
- 初始 TTL = LEASE_TTL(建议 90s)
- 执行中每 LEASE_TTL / 3(30s)续一次(EXPIRE 重置 TTL)
- 正常执行:不管跑多久,心跳持续续租,lease 永不过期
- Worker 崩溃:心跳停止,最多等 LEASE_TTL(90s)自动释放
- 续租间隔 = TTL/3,给两次重试机会(某次 EXPIRE 因网络抖动失败,下次还来得及)
关键方法实现:
try_acquire_lease→SET lease:{conv_id} {msg_id} NX EX {LEASE_TTL},原子操作天然防重,初始 TTL 由心跳续租维持release_lease(conv_id, msg_id)→ Lua compare-and-del:GET lease:{conv_id}== msg_id 才DELis_cancelled→EXISTS cancel:{msg_id}inject_message→RPUSH queue:{msg_id} {content}drain_messages→ Lua 脚本:LRANGE + DEL原子取出全部消息create_interrupt→HSET interrupt:{msg_id} data {json} status pendingresolve_interrupt→ Lua 脚本:检查 status=pending → 设 resume_data + status=resolved →PUBLISH interrupt:{msg_id}wait_for_resume→ check-subscribe-check-wait 四步模式(见 1.3),防 Pub/Sub 丢通知cleanup_execution(conv_id, msg_id)→ pipeline:compare-and-del lease/interactive + DEL cancel/queue/interrupt
Lua 脚本(原子操作):
-- compare-and-del(lease 释放、interactive 清除)
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
-- drain-all(消息队列原子取出)
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
redis.call("DEL", KEYS[1])
return msgs
-- compare-and-expire(lease / interactive 心跳续租)
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
-- resolve-interrupt(原子状态转换 + 发布通知)
local status = redis.call("HGET", KEYS[1], "status")
if status == false then return "not_found" end
if status ~= "pending" then return "already_resolved" end
redis.call("HSET", KEYS[1], "status", "resolved", "resume_data", ARGV[1])
redis.call("PUBLISH", KEYS[1], "resolved")
return "resolved"
1.5 ExecutionRunner 适配
ExecutionRunner 保持为本地调度器(每个 Worker 一个实例),不做分布式化:
_tasks: dict[str, asyncio.Task]— 永远只管本 Worker 的任务_semaphore— per-Worker 并发上限- 分布式防重由 Redis lease 保证(
try_acquire_lease的SET NX天然跨 Worker 互斥)
不需要全局 Semaphore(旧计划 5.5.2 继续 defer,理由同旧计划:per-Worker 限流已足够)。
submit() 签名变更:新增 conversation_id 参数,用于心跳续租和 cleanup 时定位 Redis key + 校验 owner:
Lease 心跳续租:_wrapped() 启动后台心跳,定期续租 lease 和 interactive key:
async def _wrapped():
heartbeat = asyncio.create_task(
self._renew_loop(conversation_id, task_id, interval=config.LEASE_TTL // 3)
)
async with self._semaphore:
try:
await coro
finally:
heartbeat.cancel()
await self.store.cleanup_execution(conversation_id, task_id)
async def _renew_loop(self, conversation_id, task_id, interval):
"""每 interval 秒续一次 lease + interactive"""
while True:
await asyncio.sleep(interval)
await self.store.renew_lease(conversation_id, task_id, ttl=interval * 3)
renew_lease 实现:
- InMemoryRuntimeStore:空操作(内存无 TTL 概念)
- RedisRuntimeStore:Lua compare-and-expire — 校验 GET lease:{conv_id} == msg_id 后才 EXPIRE,防止续错别人的 lease。interactive 同理。
1.6 故障处理
| 时机 | Redis 不可用的行为 |
|---|---|
| 启动时 | init_globals() 中 redis.ping() 失败 → 应用启动失败(fail fast) |
| 请求时 | try_acquire_lease 抛 ConnectionError → router 层返回 503 |
| 执行中 | is_cancelled / drain_messages 失败 → engine 视为"无取消/无消息",继续执行,日志 warning |
| interrupt | wait_for_resume 中 subscribe 断连 → 超时处理(视为 deny),与现有行为一致 |
1.7 涉及文件
| 文件 | 操作 |
|---|---|
docker-compose.yml |
改 — 新增 Redis 服务 |
src/config.py |
改 — 新增 REDIS_URL |
src/api/services/runtime_store.py |
改 — Protocol async 化 + InMemory 适配 |
src/api/services/redis_runtime_store.py |
新建 — Redis 实现 |
src/core/engine.py |
改 — InterruptState 解耦、EngineHooks async 化、wait_for_resume |
src/core/controller.py |
改 — on_engine_exit: Callable[[str, str], Awaitable[None]],stream_execute() 传 (conv_id, msg_id) 调用,确保 compare-and-del |
src/api/routers/chat.py |
改 — await store 调用 |
src/api/routers/stream.py |
改 — await store 调用 |
src/api/services/execution_runner.py |
改 — await store 调用 |
src/api/dependencies.py |
改 — Redis 连接初始化、注入 RedisRuntimeStore |
tests/test_runtime_store.py |
改 — async 适配 |
tests/test_execution_runner.py |
改 — async 适配 |
tests/test_engine_execution.py |
改 — hooks async 适配 |
tests/integration/test_redis_runtime_store.py |
新建 — Redis 集成测试 |
1.8 退出标准
- [ ] 单 Worker 部署行为与改造前完全一致(InMemoryRuntimeStore 仍可用于开发/测试)
- [ ] 同一 conversation 并发 POST /chat 到不同 Worker → 第二个被拒绝(409)
- [ ] Worker A 启动的执行,Worker B 可以 inject/cancel/resume
- [ ] Permission interrupt 跨 Worker:Worker A 运行 engine → Worker B 收到 /resume → engine 唤醒
- [ ] Worker 崩溃后 lease TTL 过期 → 新请求可正常提交
- [ ] Redis 不可用 → 启动 fail fast / 请求 503 / 执行中 graceful degrade
- [ ] 现有回归测试全部通过
Phase 2:Redis StreamTransport — 跨 Worker 事件推送
目标:
StreamTransport从 InMemory Queue 切到 Redis Streams,解决 POST /chat 和 GET /stream 可能落在不同 Worker 的问题。
2.1 为什么选 Redis Streams
与旧计划 5.3 相同,此处不重复。核心理由:Pub/Sub 断线丢消息,Streams 持久化 + 断点重放。
2.2 RedisStreamTransport 实现
src/api/services/redis_stream_transport.py:
class RedisStreamTransport:
"""基于 Redis Streams 的 StreamTransport 实现"""
def __init__(self, redis: redis.asyncio.Redis, ttl_seconds: int = 30):
self._redis = redis
self.ttl_seconds = ttl_seconds
Redis Key 设计:
| Key | 类型 | TTL | 用途 |
|---|---|---|---|
stream:{msg_id} |
STREAM | STREAM_TTL |
事件流 |
stream_meta:{msg_id} |
HASH | STREAM_TTL |
stream 元数据 |
方法映射:
| StreamTransport 方法 | Redis 操作 |
|---|---|
create_stream |
HSET stream_meta:{id} owner {uid} status pending + EXPIRE |
push_event |
XADD stream:{id} MAXLEN ~1000 * type {t} data {json} |
consume_events |
XREAD BLOCK {heartbeat_interval} STREAMS stream:{id} {last_id} |
close_stream |
HSET stream_meta:{id} status closed |
get_stream_status |
HGET stream_meta:{id} status |
consume_events 内部逻辑:
1. 校验 owner(HGET stream_meta:{id} owner)
2. 取消 TTL(前端已连接)
3. 循环 XREAD BLOCK,超时时 yield __ping__
4. 终结事件后 EXPIRE(延迟清理)
2.3 SSE 断线重连
与旧计划 5.3.3 相同:
consume_events新增last_event_id参数- SSE 响应附带
id:字段(Redis Stream entry ID) src/api/routers/stream.py读取Last-Event-IDheader- 前端
lib/sse.ts手动维护last_event_id(非 EventSource,无自动重连)
2.4 涉及文件
| 文件 | 操作 |
|---|---|
src/api/services/redis_stream_transport.py |
新建 |
src/api/services/stream_transport.py |
改 — consume_events 增加 last_event_id 参数 |
src/api/services/stream_manager.py |
改 — 适配新参数(InMemory 实现忽略 last_event_id) |
src/api/routers/stream.py |
改 — 读取 Last-Event-ID + SSE id 字段 |
src/api/routers/chat.py |
改 — _run_and_push 适配(push_event 的 entry ID 透传) |
src/api/dependencies.py |
改 — 注入 RedisStreamTransport |
src/api/utils/sse.py |
改 — SSE event 增加 id 字段 |
frontend/src/lib/sse.ts |
改 — 维护 last_event_id,断线重连携带 |
tests/integration/test_redis_stream_transport.py |
新建 |
2.5 退出标准
- [ ] Worker A push 事件 → Worker B 的 SSE 连接可消费
- [ ] 断线重连:消费中断 → 用 last_event_id 重连 → 不丢消息
- [ ] TTL 过期后 Stream key 自动删除
- [ ] 所有权隔离:非 owner 消费被拒绝(
StreamNotFoundError) - [ ] Redis 故障:SSE 连接建立时 503 / 流中断时 SSE error 事件
- [ ] 现有回归测试全部通过
Phase 3:去 SQLite + 关系型数据库通用适配
去掉 SQLite 生产代码,支持 TDSQL (MySQL) 和 PostgreSQL 双兼容。用户通过
DATABASE_URL和可选依赖选择目标数据库。
3.0 当前代码的 SQLite / 方言依赖审计
代码并非"ORM 就完事了"——以下是实际查出的兼容问题:
| 问题 | 位置 | 影响 | 处理 |
|---|---|---|---|
| 手写 SQL 迁移 | 001_initial_schema.py — AUTOINCREMENT、INSERT OR IGNORE、CREATE INDEX IF NOT EXISTS |
MySQL 语法不同 | 删除,换 Alembic |
| PRAGMA 硬编码 | database.py — WAL、journal_mode、busy_timeout 等 5 条 PRAGMA |
MySQL 无 PRAGMA | 移除 SQLite 分支 |
_is_sqlite() 分支 |
database.py L86-88, L107-120 — engine 配置、StaticPool、WAL |
MySQL 不需要 | 移除 |
| 默认 URL | config.py — sqlite+aiosqlite:///data/artifactflow.db |
需改 MySQL | 改默认值 |
datetime.now 无时区 |
models.py 全部时间字段 default=datetime.now |
SQLite 存文本无所谓,MySQL DATETIME 与服务器时区交互 |
改为 func.now() 由数据库控制 |
.returning() |
已移除 ✅ | — | 无需处理 |
| Repository 原生 SQL | 无 ✅ — 全部通过 ORM 构造查询 | — | 无需处理 |
JSON 类型 |
models.py — messages.metadata, artifacts.metadata_ |
MySQL 5.7+ 原生支持 JSON ✅ |
无需处理(但注意 TDSQL 版本需 ≥5.7) |
Text 列排序 |
conversation_repo ORDER BY updated_at(非 Text 列) |
未发现对 Text 列排序 ✅ | 无需处理 |
| String PK | users/conversations/messages/artifacts 主键均为 String(64) |
MySQL InnoDB 字符串 PK 比整型慢(B+Tree 比较成本),但功能可用 | 不改(改动量过大,当前规模无性能问题) |
| 测试内存库 | conftest — sqlite+aiosqlite:///:memory: |
单元测试继续用 | 保留(见测试策略) |
3.1 Alembic 迁移框架
当前状态:手写 SQL 迁移(001_initial_schema.py,含 SQLite 方言)。
改动:
- alembic init,env.py 配置 async engine(aiomysql 驱动)
- 从 ORM models autogenerate 初始迁移 — Alembic 的 DDL API 天然生成 MySQL 兼容语法
- 迁移执行策略:部署前单次执行(alembic upgrade head),不在应用启动时自动运行(多 Worker 并发迁移竞态)
- DatabaseManager.initialize() 改为 schema version 校验:启动时检查版本,不匹配 fail fast
- 删除 001_initial_schema.py(含 AUTOINCREMENT、INSERT OR IGNORE 等 SQLite 方言)
3.2 DatabaseManager 简化 + 引擎切换
# database.py — 移除全部 SQLite 分支,零方言判断
class DatabaseManager:
def __init__(self, database_url: str, echo: bool = False):
self.database_url = database_url
...
async def initialize(self):
self._engine = create_async_engine(
self.database_url,
pool_size=config.DB_POOL_SIZE, # 默认 10
max_overflow=config.DB_MAX_OVERFLOW, # 默认 20
pool_timeout=config.DB_POOL_TIMEOUT, # 默认 30
pool_recycle=config.DB_POOL_RECYCLE, # 默认 1800
pool_pre_ping=True,
echo=self.echo,
)
# 无 PRAGMA、无 WAL、无 _is_sqlite()
移除清单:
- _is_sqlite() 方法
- _configure_sqlite_wal() 方法
- check_same_thread connect_args
- StaticPool 导入和内存库分支(测试库保留,见 3.6)
- 默认 URL 从 sqlite+aiosqlite:/// 改为空(必须显式配置)
config.py 新增:
DATABASE_URL: str = "" # 必须配置,无默认值
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20
DB_POOL_TIMEOUT: int = 30
DB_POOL_RECYCLE: int = 1800
驱动由用户按需安装,DatabaseManager 只认 URL 前缀,不关心底层驱动:
# TDSQL/MySQL 部署
pip install -e ".[mysql]" # → aiomysql
DATABASE_URL=mysql+aiomysql://user:pass@host:3306/artifactflow
# PostgreSQL 部署
pip install -e ".[postgres]" # → asyncpg
DATABASE_URL=postgresql+asyncpg://user:pass@host:5432/artifactflow
3.3 ORM 模型适配
datetime.now → func.now():
# 旧:
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
# 新:
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
理由:SQLite 存 datetime 为文本,datetime.now 用 Python 端时间没问题。MySQL 的 DATETIME 列受服务器时区影响,Python 端和 DB 端时间可能不一致。用 func.now() 让数据库控制时间,保证一致性。
部署提醒:确认云库时区配置(TDSQL 默认
Asia/Shanghai),所有 Worker 连同一个库即保证时间一致。
其他类型无需改动:JSON → MySQL JSON / PostgreSQL JSONB、Text → TEXT、String(N) → VARCHAR(N)、Integer + autoincrement=True → MySQL AUTO_INCREMENT / PostgreSQL SERIAL。SQLAlchemy 根据 URL 前缀自动选择正确映射,代码层零方言判断。
3.4 依赖策略
# pyproject.toml
[project]
dependencies = [
"sqlalchemy>=2.0",
"aiosqlite", # 默认装 — 测试用 SQLite 内存库,pip install 即可跑全部单元测试
# ...其他依赖
]
[project.optional-dependencies]
mysql = ["aiomysql"] # pip install -e ".[mysql]"
postgres = ["asyncpg"] # pip install -e ".[postgres]"
设计原则:
- aiosqlite 是默认依赖 — git clone + pip install -e . + pytest 零配置跑通全部单元测试
- 生产数据库驱动按需安装 — 用户部署时选择目标数据库,只装对应驱动
- DatabaseManager 只认 URL 前缀,SQLAlchemy 自动 import 对应驱动,代码零方言判断
3.5 Docker 开发环境
提供两套 compose profile,开发者按需选择:
# docker-compose.dev.yml — MySQL profile(TDSQL 部署场景)
mysql:
image: mysql:8.0
environment:
MYSQL_DATABASE: artifactflow
MYSQL_USER: artifactflow
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-changeme}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpass}
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
command: >
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--default-time-zone='+00:00'
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5
# docker-compose.dev.yml — PostgreSQL profile(PostgreSQL 部署场景)
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: artifactflow
POSTGRES_USER: artifactflow
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-changeme}
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U artifactflow"]
interval: 10s
timeout: 5s
retries: 5
MySQL 关键配置:
- utf8mb4 — 支持 emoji 和完整 Unicode(agent 输出可能包含)
- default-time-zone='+00:00' — 服务端 UTC,配合 func.now() 使用
- 生产环境用云托管 TDSQL/RDS,此配置仅本地开发用
3.5 复合索引
# conversations 表 — list_conversations() WHERE user_id=? ORDER BY updated_at DESC
Index("ix_conversations_user_updated", "user_id", "updated_at")
# messages 表 — 对话内消息加载 WHERE conversation_id=? ORDER BY created_at
Index("ix_messages_conv_created", "conversation_id", "created_at")
3.7 测试策略
| 层 | 数据库 | 理由 |
|---|---|---|
| 单元测试(Repository / Manager) | SQLite 内存库(aiosqlite,默认依赖) |
快(~3s 全跑完),pip install -e . 零配置即可运行,CI 无需额外服务 |
| 集成测试(API 端到端) | MySQL 或 PostgreSQL(docker-compose.test.yml) |
验证真实方言行为(时区、字符集、连接池) |
| 手动测试 | MySQL 或 PostgreSQL(docker-compose.dev.yml) |
与生产一致 |
tests/conftest.py 保留 sqlite+aiosqlite:///:memory:(需保留 StaticPool 导入和内存库分支,仅在测试工具函数中,不在生产 DatabaseManager 中)。做法:create_test_database_manager() 独立函数,不走生产代码路径。
3.8 涉及文件
| 文件 | 操作 |
|---|---|
src/db/database.py |
改 — 移除 SQLite 分支,连接池参数外部化 |
src/config.py |
改 — DATABASE_URL 改空默认 + 池参数 |
src/db/models.py |
改 — datetime.now → func.now() |
src/db/migrations/versions/001_initial_schema.py |
删除 |
src/db/migrations/ |
改 — Alembic 初始化 + autogenerate |
pyproject.toml |
改 — aiosqlite 默认依赖,aiomysql / asyncpg 可选依赖 |
docker-compose.dev.yml |
新建 — MySQL + PostgreSQL 双 profile |
tests/conftest.py |
改 — create_test_database_manager() 独立于生产路径 |
tests/integration/ |
新建 — 数据库集成测试 |
3.9 退出标准
- [ ] MySQL 上全部回归测试通过
- [ ] PostgreSQL 上全部回归测试通过
- [ ] 并发写入测试通过(InnoDB / PostgreSQL MVCC)
- [ ] 生产
DatabaseManager中零 SQLite 代码(aiosqlite仅作为测试依赖被引用) - [ ]
datetime时区一致性验证(DB 端 UTC → Python 端正确解析) - [ ]
utf8mb4/ UTF-8 字符集验证(emoji 内容正常存取) - [ ]
/health端点检查数据库 + Redis 连通性 - [ ]
pip install -e .+pytest tests/(不含 integration)零配置通过
贯穿策略
Manager 缓存决策(不变)
与旧计划 5.4 一致:ConversationManager._cache 和 ArtifactManager._cache 保持 request-local 内存缓存,不迁移 Redis。
测试策略
单元测试:继续用 InMemoryRuntimeStore + StreamManager(内存实现)+ SQLite 内存库(aiosqlite 默认依赖)。pip install -e . + pytest tests/ 零配置、零容器、几秒跑完。
集成测试(tests/integration/):
| 文件 | 覆盖 |
|---|---|
test_redis_runtime_store.py |
lease 跨 Worker 互斥、interrupt pub/sub 唤醒、TTL 过期清理 |
test_redis_stream_transport.py |
跨进程 push/consume、断线重连、TTL |
test_redis_fault.py |
连接中断 503、恢复后自动重连 |
CI 基础设施:docker-compose.test.yml 包含 Redis + MySQL (或 PostgreSQL) service container。
开发者体验
Phase 3 完成后砍掉生产 DatabaseManager 中的 SQLite 代码,aiosqlite 保留为默认依赖供测试使用。
本地开发三种模式:
| 模式 | 适用场景 | 需要容器 |
|---|---|---|
pip install -e . + pytest |
日常开发,跑单元测试 | 不需要 |
docker-compose.dev.yml + 本地启动后端 |
功能开发,连真实数据库 | MySQL/PG + Redis |
pytest tests/integration/ |
集成测试 | MySQL/PG + Redis |
环境变量 REDIS_URL 不设 → fallback 到 InMemoryRuntimeStore(便于快速原型,不推荐生产)。
实施节奏
三轮提交,每轮独立可验证:
第一轮:P3 — 数据库层(改动面最隔离,先落地) ✅ DONE
3.1 Alembic 替换手写迁移
3.2 DatabaseManager 去 SQLite 分支
3.3 datetime.now → func.now()
3.4 依赖策略 (aiosqlite 默认 + extras)
→ 全量单元测试通过 → commit
(数据库层就绪,上层完全不动)
第二轮:P1.2 — Protocol async 化(纯机械改动,接口就绪) ✅ DONE
RuntimeStore Protocol 全部 async
InMemoryRuntimeStore 加 async 关键字
所有调用点加 await
EngineHooks 签名改 Awaitable
on_engine_exit 改 async + (conv_id, msg_id) 双参数
submit() 新增 conversation_id 参数
StreamManager → InMemoryStreamTransport,合入 stream_transport.py
create_interrupt + wait_for_resume 合并为 wait_for_interrupt
InterruptState 降级为 InMemory 内部实现细节
→ 全量单元测试通过 → commit
(仍是 InMemory,行为不变,但接口已就绪)
第三轮:P1.3-1.4 + P2 — Redis 实现(一起写一起上线) ✅ DONE
1.1 Redis 基础设施 (docker-compose + config)
1.3 RedisRuntimeStore (含 check-subscribe-check-wait、心跳续租、Lua 脚本)
1.5 ExecutionRunner 心跳任务
1.6 故障处理
2.2 RedisStreamTransport (含断线重连)
2.3 SSE Last-Event-ID 支持
dependencies.py REDIS_URL 分支
→ 单元测试 + 集成测试通过 → commit
(Phase 1 + Phase 2 必须一起上线,见能力边界说明)
补丁:stream 生命周期修复(consumer 断连不关 stream、producer 侧 close、
consumer_id CAS 防竞态、孤儿 key 修复)、lease 竞态 Lua 原子化、
前端 SSE 自动重连(Last-Event-ID + 指数退避 + ownership guard)、
active-stream 查询 API
为什么这个顺序:
- P3 先行:改的是数据层,与运行时状态无关。改完跑一遍全量测试就能确认没 break。如果和 P1/P2 叠加,数据库驱动 + Protocol async 同时出问题很难定位。
- P1.2 单独一轮:把"接口变更"和"实现切换"隔离。接口变更的 bug 能通过现有单元测试(InMemory 实现)立刻发现,不需要 Redis 容器。
- P1.3+P2 合并:Redis RuntimeStore 和 Redis StreamTransport 共用 Redis 连接,且必须一起上线才能支撑双机部署(单上 RuntimeStore 时 SSE 仍然跨 Worker 不可达)。
验证矩阵
| 场景 | 单 Worker (InMemory) | 多 Worker (Redis) |
|---|---|---|
| POST /chat 并发保护 | dict lease ✅ | SET NX lease ✅ |
| inject/cancel 路由 | dict 查询 ✅ | GET 查询 ✅ |
| permission interrupt | asyncio.Event ✅ | pub/sub + local Event ✅ |
| SSE 事件投递 | asyncio.Queue ✅ | Redis Streams ✅ |
| 断线重连 | ❌ 无 | last_event_id ✅ |
| Worker 崩溃恢复 | N/A | TTL 自动释放 ✅ |
| DB 并发写入 | SQLite WAL (受限) | MySQL InnoDB / PostgreSQL MVCC ✅ |