数据层
数据层负责持久化存储,包括对话历史、消息、Artifact 等。
模块结构
src/
├── db/
│ ├── models.py # SQLAlchemy ORM 模型
│ └── database.py # 数据库管理器
└── repositories/
├── base.py # Repository 基类
├── conversation_repo.py
├── artifact_repo.py
└── user_repo.py # 用户数据访问
数据模型 (models.py)
ER 图
erDiagram
User ||--o{ Conversation : owns
Conversation ||--o{ Message : contains
Conversation ||--o| ArtifactSession : has
ArtifactSession ||--o{ Artifact : contains
Artifact ||--o{ ArtifactVersion : versions
User {
string id PK
string username UK
string hashed_password
string display_name
string role
bool is_active
datetime created_at
datetime updated_at
}
Conversation {
string id PK
string active_branch
string title
string user_id FK
datetime created_at
datetime updated_at
json metadata_
}
Message {
string id PK
string conversation_id FK
string parent_id
text content
string thread_id
text graph_response
datetime created_at
json metadata_
}
ArtifactSession {
string id PK "FK"
datetime created_at
datetime updated_at
}
Artifact {
string id PK
string session_id PK "FK"
string content_type
string title
text content
int current_version
int lock_version
datetime created_at
datetime updated_at
json metadata_
}
ArtifactVersion {
int id PK
string artifact_id FK
string session_id FK
int version
text content
string update_type
json changes
datetime created_at
}
User
用户实体(认证与数据隔离):
class User(Base):
__tablename__ = "users"
id: Mapped[str] = mapped_column(String(64), primary_key=True) # "user-{uuid}"
username: Mapped[str] = mapped_column(String(64), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(256))
display_name: Mapped[Optional[str]] = mapped_column(String(128), nullable=True)
role: Mapped[str] = mapped_column(String(16), default="user") # "admin" | "user"
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
conversations: Mapped[List["Conversation"]] = relationship("Conversation", back_populates="owner")
角色说明:
- admin:可创建/管理用户、查看所有用户列表
- user:普通用户,只能访问自己的对话和 Artifact
Conversation
对话实体:
class Conversation(Base):
__tablename__ = "conversations"
id: Mapped[str] = mapped_column(String(64), primary_key=True)
active_branch: Mapped[Optional[str]] = mapped_column(String(64), nullable=True)
title: Mapped[Optional[str]] = mapped_column(String(256), nullable=True)
user_id: Mapped[Optional[str]] = mapped_column(
String(64), ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
metadata_: Mapped[Optional[Dict[str, Any]]] = mapped_column("metadata", JSON, nullable=True)
# 关系
owner: Mapped[Optional["User"]] = relationship("User", back_populates="conversations")
messages: Mapped[List["Message"]] = relationship(
"Message", back_populates="conversation", cascade="all, delete-orphan", lazy="selectin"
)
artifact_session: Mapped[Optional["ArtifactSession"]] = relationship(
"ArtifactSession", back_populates="conversation", uselist=False, cascade="all, delete-orphan"
)
数据隔离:user_id 外键关联 users.id,API 层通过 ownership 校验确保用户只能访问自己的对话。跨用户访问统一返回 404(不暴露存在性)。
Message
消息实体(树状结构):
class Message(Base):
__tablename__ = "messages"
id: Mapped[str] = mapped_column(String(64), primary_key=True)
conversation_id: Mapped[str] = mapped_column(
String(64), ForeignKey("conversations.id", ondelete="CASCADE")
)
parent_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) # None = 根消息
content: Mapped[str] = mapped_column(Text) # 用户消息
thread_id: Mapped[str] = mapped_column(String(64)) # LangGraph 线程 ID
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
graph_response: Mapped[Optional[str]] = mapped_column(Text) # Agent 回复
metadata_: Mapped[Optional[Dict]] = mapped_column("metadata", JSON) # 扩展元数据
conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages")
树状结构说明:
parent_id 实现消息树:
msg_1 (parent_id=None) ← 根消息
├── msg_2 (parent_id=msg_1)
│ └── msg_4 (parent_id=msg_2)
└── msg_3 (parent_id=msg_1) ← 分支
└── msg_5 (parent_id=msg_3)
Artifact
文稿实体(复合主键 + 乐观锁):
class Artifact(Base):
__tablename__ = "artifacts"
# 复合主键
id: Mapped[str] = mapped_column(String(64), primary_key=True)
session_id: Mapped[str] = mapped_column(
String(64), ForeignKey("artifact_sessions.id", ondelete="CASCADE"), primary_key=True
)
content_type: Mapped[str] = mapped_column(String(64)) # MIME type (text/markdown, text/x-python, etc)
title: Mapped[str] = mapped_column(String(256))
content: Mapped[str] = mapped_column(Text, default="")
source: Mapped[str] = mapped_column(String(32), default="agent") # agent / user_upload
# 版本控制
current_version: Mapped[int] = mapped_column(Integer, default=1)
lock_version: Mapped[int] = mapped_column(Integer, default=1) # 乐观锁
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, onupdate=datetime.now)
metadata_: Mapped[Optional[Dict]] = mapped_column("metadata", JSON)
versions: Mapped[List["ArtifactVersion"]] = relationship(
"ArtifactVersion", back_populates="artifact", cascade="all, delete-orphan"
)
ArtifactVersion
版本历史:
class ArtifactVersion(Base):
__tablename__ = "artifact_versions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
# 复合外键
artifact_id: Mapped[str] = mapped_column(String(64))
session_id: Mapped[str] = mapped_column(String(64))
version: Mapped[int] = mapped_column(Integer)
content: Mapped[str] = mapped_column(Text)
update_type: Mapped[str] = mapped_column(String(32)) # create/update/update_fuzzy/rewrite
changes: Mapped[Optional[List]] = mapped_column(JSON) # [(old, new), ...]
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
artifact: Mapped["Artifact"] = relationship("Artifact", back_populates="versions")
__table_args__ = (
UniqueConstraint("artifact_id", "session_id", "version"),
ForeignKeyConstraint(
["artifact_id", "session_id"],
["artifacts.id", "artifacts.session_id"],
ondelete="CASCADE"
),
)
数据库管理 (database.py)
DatabaseManager
class DatabaseManager:
def __init__(
self,
database_url: Optional[str] = None,
echo: bool = False,
):
"""
Args:
database_url: 数据库连接 URL,默认为 SQLite
echo: 是否打印 SQL 语句(调试用)
"""
if database_url is None:
database_url = "sqlite+aiosqlite:///data/artifactflow.db"
self.database_url = database_url
self.echo = echo
self._engine: Optional[AsyncEngine] = None
self._session_factory: Optional[async_sessionmaker] = None
self._initialized = False
async def initialize(self):
"""初始化数据库(创建引擎、配置 WAL、创建表)"""
self._engine = create_async_engine(self.database_url, echo=self.echo)
# SQLite WAL 模式配置(提高并发性能)
if "sqlite" in self.database_url:
async with self._engine.begin() as conn:
await conn.execute(text("PRAGMA journal_mode=WAL"))
await conn.execute(text("PRAGMA foreign_keys=ON"))
self._session_factory = async_sessionmaker(
bind=self._engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
# 创建所有表
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
self._initialized = True
@asynccontextmanager
async def session(self) -> AsyncGenerator[AsyncSession, None]:
"""获取数据库会话(纯生命周期管理,不自动 commit/rollback)"""
if not self._initialized:
await self.initialize()
session = self._session_factory()
try:
yield session
finally:
await session.close()
async def close(self):
"""关闭数据库连接"""
if self._engine:
await self._engine.dispose()
self._initialized = False
使用示例:
db_manager = DatabaseManager("sqlite+aiosqlite:///data/app.db")
await db_manager.initialize()
async with db_manager.session() as session:
repo = ConversationRepository(session)
conversation = await repo.get_by_id("xxx")
Repository 模式
BaseRepository
通用 CRUD 操作:
class BaseRepository(ABC, Generic[T]):
def __init__(self, session: AsyncSession, model_class: Type[T]):
self._session = session
self._model_class = model_class
async def get_by_id(self, id: Any) -> Optional[T]:
return await self._session.get(self._model_class, id)
async def get_all(self, *, limit: Optional[int] = None, offset: Optional[int] = None) -> List[T]:
query = select(self._model_class)
if offset: query = query.offset(offset)
if limit: query = query.limit(limit)
result = await self._session.execute(query)
return list(result.scalars().all())
async def count(self) -> int:
"""获取实体总数"""
async def exists(self, id: Any) -> bool:
"""检查实体是否存在"""
async def add(self, entity: T) -> T:
"""添加新实体(flush + commit 立即释放写锁)"""
self._session.add(entity)
await self._session.flush()
await self._session.commit()
await self._session.refresh(entity)
return entity
async def add_all(self, entities: List[T]) -> List[T]:
"""批量添加实体"""
self._session.add_all(entities)
await self._session.flush()
await self._session.commit()
for entity in entities:
await self._session.refresh(entity)
return entities
async def update(self, entity: T) -> T:
"""更新实体(需已在 Session 中)"""
await self._session.flush()
await self._session.commit()
await self._session.refresh(entity)
return entity
async def delete(self, entity: T) -> None:
"""删除实体"""
await self._session.delete(entity)
await self._session.flush()
await self._session.commit()
async def delete_by_id(self, id: Any) -> bool:
"""根据主键删除"""
ConversationRepository
对话和消息操作:
| 方法 | 说明 |
|---|---|
create_conversation |
创建新对话(同时创建关联的 ArtifactSession) |
get_conversation |
获取对话(可选预加载消息和 Artifacts) |
get_conversation_or_raise |
获取对话,不存在则抛出 NotFoundError |
update_active_branch |
更新对话的活动分支 |
update_title |
更新对话标题 |
list_conversations |
列出对话(支持分页、按 user_id 过滤) |
delete_conversation |
删除对话 |
add_message |
添加消息到对话(自动更新 active_branch) |
get_message |
获取单条消息 |
get_message_or_raise |
获取消息,不存在则抛出 NotFoundError |
update_graph_response |
更新消息的 Graph 响应 |
get_conversation_messages |
获取对话的所有消息 |
get_conversation_path |
获取从根到指定消息的路径(向上追溯) |
get_branch_children |
获取消息的子分支 |
format_conversation_history |
格式化为 [{"role": "user"}, {"role": "assistant"}, ...] |
get_branch_structure |
获取对话的完整分支结构 |
ArtifactRepository
Artifact 操作(含乐观锁):
| 方法 | 说明 |
|---|---|
get_session |
获取 ArtifactSession |
get_session_or_raise |
获取 Session,不存在则抛出 NotFoundError |
ensure_session_exists |
确保 Session 存在(不存在则创建) |
create_artifact |
创建 Artifact(同时创建初始版本) |
get_artifact |
获取 Artifact(复合主键查询) |
get_artifact_or_raise |
获取 Artifact,不存在则抛出 NotFoundError |
list_artifacts |
列出 Session 下所有 Artifacts |
delete_artifact |
删除 Artifact |
update_artifact_content |
乐观锁更新内容(版本冲突抛出 VersionConflictError) |
rewrite_artifact |
完全重写内容(乐观锁) |
update_artifact_title |
更新标题 |
get_version |
获取指定版本记录 |
get_version_content |
获取指定版本的内容 |
list_versions |
列出版本历史(不含完整内容) |
get_version_diff |
获取两个版本间的差异 |
get_artifacts_with_full_content |
批量获取完整内容 |
UserRepository
用户操作:
| 方法 | 说明 |
|---|---|
get_by_username |
按用户名查询(登录) |
get_by_id |
按 ID 查询(继承自 BaseRepository) |
list_users |
列出用户(支持分页、可选包含已禁用用户) |
count_users |
用户总数 |
乐观锁机制
为什么需要乐观锁?
多 Agent 并发场景下,可能同时更新同一个 Artifact:
Agent A: 读取 Artifact (version=1)
Agent B: 读取 Artifact (version=1)
Agent A: 更新 Artifact → version=2
Agent B: 更新 Artifact → 冲突!
处理流程
sequenceDiagram
participant Agent
participant Tool
participant Manager
participant Repo
participant DB
Agent->>Tool: update_artifact(id, old_str, new_str)
Tool->>Manager: update_artifact(...)
Manager->>Manager: 从缓存获取 lock_version
Manager->>Repo: update_artifact_content(expected_lock_version)
Repo->>DB: UPDATE ... WHERE lock_version=X RETURNING ...
alt 更新成功(lock_version 匹配)
DB->>Repo: new_version, new_lock_version
Repo->>DB: INSERT INTO artifact_versions ...
Repo->>Manager: 返回更新后的 Artifact
Manager->>Manager: 更新缓存中的 lock_version
Manager->>Tool: (success, message, match_info)
Tool->>Agent: ToolResult(success=True)
else 更新失败(lock_version 不匹配)
DB->>Repo: 0 rows affected
Repo->>Repo: 检查是不存在还是版本冲突
Repo->>Manager: VersionConflictError
Manager->>Manager: 清除缓存
Manager->>Tool: (False, "Version conflict", None)
Tool->>Agent: ToolResult(success=False, error="Version conflict")
end
说明:lock_version 由 ArtifactManager 在内存缓存中自动管理,Agent 无需感知版本号。
Agent 处理冲突
当收到版本冲突错误时,Agent 应该:
- 重新调用
read_artifact获取最新内容(Manager 会重新加载缓存) - 基于最新内容重新计算变更
- 重试更新
LangGraph Checkpointer
除了业务数据,LangGraph 状态也需要持久化:
# src/core/graph.py
async def create_async_sqlite_checkpointer(db_path: str = "data/langgraph.db"):
"""创建 LangGraph 状态检查点"""
import aiosqlite
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
# 确保目录存在
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
# 手动创建连接以便配置 PRAGMA
conn = await aiosqlite.connect(db_path)
await conn.execute("PRAGMA journal_mode=WAL") # WAL 模式提高并发
await conn.execute("PRAGMA busy_timeout=5000") # 5 秒忙等待
checkpointer = AsyncSqliteSaver(conn)
await checkpointer.setup()
return checkpointer
Checkpointer 存储:
- 每个
thread_id的完整状态历史 - 支持
interrupt()后恢复执行 - 存储在独立的 SQLite 文件(
data/langgraph.db)