可观测性与监控
Event Sourcing 做回看 + Admin API 做审计 + 健康检查做探活 — 让运行时是"透明盒子"而非"黑盒子"。
总体设计
ArtifactFlow 的可观测性建立在 Event Sourcing 上:引擎执行过程中产生的每一个 ExecutionEvent,除了实时通过 SSE 推给前端(见 streaming.md),还会在执行边界批量落库到 MessageEvent 表。之后任何时刻都能按 message_id 完整重放一次执行,无需保留额外日志。
flowchart LR
Engine --> emit[emit ExecutionEvent]
emit --> mem[state events: list]
emit --> sse[StreamTransport.push_event]
sse --> client[SSE 客户端]
mem -->|终止边界批量写入| DB[(message_events)]
DB --> admin[Admin API / 监控 UI]
两个关键设计:
- 双路投递:SSE 实时性优先(包括
llm_chunk),DB 则只在终止边界(complete/cancelled/error)一次性批量写入 — 高频事件不拖慢事务。 llm_chunkSSE-only:见 streaming.md → Design Decisions。
MessageEvent 表
src/db/models.py:
| 字段 | 类型 | 说明 |
|---|---|---|
id |
int 自增 |
主键,天然有序 |
event_id |
str(96) unique, nullable |
业务去重键 {message_id}-{seq},支持批量写入重试幂等 |
message_id |
FK → messages.id ON DELETE CASCADE |
所属 message |
event_type |
str(32) |
StreamEventType.value |
agent_name |
str(64) nullable |
产生事件的 agent,Controller 层事件为 null |
data |
JSON nullable |
完整事件 payload,不截断 |
created_at |
DateTime server_default=func.now() |
写入时间 |
索引:ix_message_events_message on message_id — 查询模式永远是"按 message 拉时间线"。
表特性:append-only。不做 UPDATE、不做 DELETE(除消息级 CASCADE)。这让表的写锁持有极短,适配 SQLite 单写锁。
持久化边界
src/core/controller.py 的 _persist_events():
# 伪代码
async def _persist_events(state):
# state["events"] 混有轮起始从 DB 加载的历史事件(is_historical=True)
# 和本轮引擎新追加的事件(is_historical=False);只写后者,避免重复
new_events = [e for e in state["events"] if not e.is_historical]
db_events = [
MessageEvent(
event_id=f"{message_id}-{seq}",
message_id=message_id,
event_type=e.event_type,
agent_name=e.agent_name,
data=e.data,
)
for seq, e in enumerate(new_events)
]
await event_repo.batch_create(db_events)
触发点:引擎 loop 退出后进入 post-processing,_persist_events() 一次性批量写入本轮新追加的事件(is_historical=False)。历史事件在轮开始时由 Controller 从 MessageEvent 表按 conversation path 加载并填入 state["events"](is_historical=True),仅供 EventHistory 在 context 构建时消费,不再重写回 DB。complete / cancelled 两条路径走的是"先写 DB、后 emit 终端事件到 SSE":Controller 在 post-processing 内把终端事件 append 到 events 列表、调用 _persist_events()、再 yield 给 SSE — 前端收到 complete / cancelled 时 DB 已就绪。
error 路径不保证可持久化:
- 引擎内部抛异常时,Controller 捕获后立即把
error事件推进事件队列(yield 给 SSE 客户端),之后才在 post-processing 里调用_persist_events()。此时 SSE 与 DB 之间存在竞争窗口,窗口大小取决于 post-processing 其余步骤(flush artifacts、DB retry backoff 指数为 1s/2s/4s) - 更严重:post-processing 自身出错时(
controller.py的 except 分支),Controller 直接 yield 一条 SSEerror,这条事件既不会追加到state["events"],也不会触发_persist_events()— 永远不会进MessageEvent表
结论:不要假设 SSE 收到 error 后 GET /messages/{id}/events 能查到该错误。监控回看以 DB 为准时,必须把 SSE error 视为"可能只出现在实时流里"的事件;需要强一致审计的场景应在外层(Controller 之外)补一份日志。Admin 监控 UI 因为是人工粒度,不会感知。
批量写入本身的收益:
- 一次 INSERT,比逐事件事务快一个数量级
event_id的 unique 约束保证崩溃重启后重试幂等
排除规则:引擎 emit llm_chunk 时显式传 sse_only=True,Controller 只推 SSE 不加入 state["events"] — DB 天然没有这一类。
事件目录
每种事件 data 字段的契约(所有事件都含 type, timestamp, agent, data 四层外壳,下列仅展开 data):
Controller 层
| 事件 | data 字段 |
|---|---|
METADATA |
conversation_id, message_id |
COMPLETE |
success=true, conversation_id, message_id, response, execution_metrics |
CANCELLED |
success=false, cancelled=true, conversation_id, message_id, response, execution_metrics |
ERROR |
success=false, conversation_id, message_id, error, execution_metrics |
execution_metrics 汇总整次执行指标(总耗时、总 token、每 agent 轮数等),是监控面板的核心字段。
Agent 层
| 事件 | data 字段 |
|---|---|
AGENT_START |
agent, system_prompt(可能 None) |
LLM_CHUNK |
content 或 reasoning_content(增量,SSE-only) |
LLM_COMPLETE |
content, reasoning_content?, token_usage={input,output,total}, model, duration_ms |
AGENT_COMPLETE |
agent, content |
LLM_COMPLETE 是"每次 LLM 调用"的边界,包含本次调用的 model、耗时、token 明细 — 成本分析和性能归因都从这里来。
工具 / 权限层
| 事件 | data 字段 |
|---|---|
TOOL_START |
tool, params |
TOOL_COMPLETE |
tool, success, result_data?, error?, duration_ms, params, metadata? |
PERMISSION_REQUEST |
permission_level, tool, params |
PERMISSION_RESULT |
approved, tool, reason? |
tool_complete.metadata 里携带 artifact_snapshot 时,前端据此实时刷新 Artifact 面板(不等 DB flush),见 artifacts.md → Write-Back.
Permission 事件的阻塞语义属于 RuntimeStore,见 concurrency.md → Interrupt 机制.
Compaction 层
| 事件 | data 字段 |
|---|---|
COMPACTION_START |
last_input_tokens, last_output_tokens(触发时本次 LLM 调用的 token 数) |
COMPACTION_SUMMARY |
success(false 时表示 compact LLM 失败、本 turn 已被 engine 标 ERROR;旧记录无此字段视为 true), content(带 frame prefix 的结构化摘要;success=false 时为空字符串), token_usage={input,output,total}(success=false 时全 0), duration_ms(success=false 时为 0), model, error(仅 success=false 时非空) |
两条事件都带 agent_name(触发 compaction 的 agent),都持久化。EventHistory 在构建历史时以 COMPACTION_SUMMARY 作为 boundary(见 engine.md → Compaction 机制),COMPACTION_START 对历史构建无影响 — 仅用于前端实时指示和 replay 重放。
输入层
| 事件 | data 字段 |
|---|---|
USER_INPUT |
content(用户首条输入) |
QUEUED_MESSAGE |
content(执行中注入的消息) |
SUBAGENT_INSTRUCTION |
instruction(lead → sub 的指令) |
QUEUED_MESSAGE 通过 /chat/{id}/inject 触发,包装后进入 lead 的上下文(见 concurrency.md → 消息注入),但只会作为事件持久化,不会创建独立的 Message 行。
Admin API
src/api/routers/admin.py,所有端点 Depends(require_admin):
GET /api/v1/admin/conversations
Query:
| 参数 | 类型 | 默认 | 约束 |
|---|---|---|---|
limit |
int | 20 | 1..100 |
offset |
int | 0 | ≥0 |
q |
str? | — | 按 title 搜索,max_length=200 |
user_id |
str? | — | 按 owner 过滤,max_length=64 |
响应:AdminConversationListResponse
{
"conversations": [
{
"id": "...", "title": "...", "user_id": "...", "user_display_name": "...",
"message_count": 42, "is_active": true,
"created_at": "...", "updated_at": "..."
}
],
"total": 120,
"has_more": true
}
is_active 的来源是 RuntimeStore.list_active_conversations() 的实时查询(内存 dict 或 Redis 扫描),与 DB 表无关 — 这保证标记反映真实执行态而非历史态。Redis 模式下这是跨实例一致的视图。
GET /api/v1/admin/conversations/{conv_id}/events
响应:AdminConversationEventsResponse
{
"conversation_id": "...",
"title": "...",
"messages": [
{
"message_id": "...",
"user_input": "...",
"response": "...",
"created_at": "...",
"events": [
{"id": 1, "event_type": "agent_start", "agent_name": "lead_agent",
"data": {...}, "created_at": "..."}
],
"execution_metrics": {...}
}
]
}
分组逻辑:后台 conversation_manager.get_admin_conversation_events(conv_id) 一次取出 conversation + messages + events,Router 层按 message_id 分组,execution_metrics 从 message.metadata_["execution_metrics"] 抽取(该字段在 COMPLETE 事件时由 Controller 同步写回 message)。
不存在独立的"单 message events"端点 — 用户侧由 GET /chat/{conv_id}/messages/{msg_id}/events 提供(见 ../guides/api-reference.md),Admin 侧的粒度是整个对话。
监控 UI
前端 components/chat/ObservabilityPanel.tsx 是 Admin 专属入口(受 uiStore.observabilityVisible 门控),消费上述 Admin API。主要视图:
- 对话浏览器:分页 + title 搜索 + 活跃标记(绿点),使用
/admin/conversations - 事件时间线:按
message_id折叠,事件类型色彩编码(LLM 紫 / Tool 蓝 / Permission 橙 / Error 红) - 事件详情面板:按
event_type分派渲染 llm_complete→ token / model / duration 仪表tool_complete→ params / result / errorpermission_request/result→ 审批上下文error→ 完整堆栈(data.error字段)
详细前端结构见 ../frontend.md.
健康检查
src/api/main.py:
GET /health/live
- 存活探针,始终返回 200
{"status": "ok"} - 用于 K8s liveness probe — 只要 app 进程活着就通过
GET /health/ready
- 就绪探针,检查依赖连通性:
- DB:
SELECT 1viaget_db_manager().session() - Redis(若配置):
await redis.ping() - 全部通过 → 200;任意失败 → 503
- 响应体始终包含各项结果:
- 用于 K8s readiness probe — DB 抖动时 Pod 被摘流量,不再接新请求
两个端点都无鉴权 — 故意设计,给 LB / K8s 直接探测。
Design Decisions
为什么 data 用 JSON 而非定长 schema
- 事件类型有 15 种且会随功能扩展(新工具、新 agent 都可能新增事件形状)
- 若每种事件独立列,每次扩展都要 migration — 与"agent-as-data"的扩展哲学冲突
- JSON 在 PG / MySQL / SQLite 都有原生支持,查询能力足够(按
event_type先过滤,再按需解析) - 代价:失去 schema 保证,由前端
types/events.ts的 TypeScript 接口 + 后端发射处约定共同守护 — 加上 event sourcing 的 append-only 特性,老版本数据不会被破坏
为什么批量写入而非逐事件落库
- 典型执行产生 50–200 个事件 — 逐条 INSERT 意味着 50–200 个事务 commit,在 SQLite WAL 下也占用显著写锁时间
- 批量一次 INSERT + 一次 commit,总耗时降一个数量级
- 原子性副作用:要么整个执行的事件全部落库,要么全部不落 — 配合
event_idunique 保证重试幂等 - 风险:崩溃时丢失未批写的事件 — 但前端已收到了 SSE 实时流,只是 DB 回看能力缺失一次执行;生产权衡可接受
为什么 is_active 从 RuntimeStore 而非 DB 读
- DB 里的
conversation.updated_at只能反映"最近有消息",不等于"正在执行" - RuntimeStore 的 active 状态来自
engine_interactive维度(见 concurrency.md → 双状态生命周期),这是真实的执行态 - Redis 模式下 RuntimeStore 本身跨实例一致 — Admin 看到的"谁在跑"是全集群视图
- 代价:Admin 页面每次刷新都要查一次 RuntimeStore(相对 DB 稍贵),但 Admin 访问频率低,可接受
为什么 llm_chunk 不做压缩后持久化
- 尝试过"把 chunks 按 token 累积存成单行"的折中方案 — 复杂度远高于直接用
llm_complete(后者本就包含完整 content) - 唯一的损失是"token 到达时间序列",但性能分析的粒度在"单次 LLM 调用"就足够,
llm_complete.duration_ms覆盖这一层