Files
invoice-master-poc-v2/ARCHITECTURE_REVIEW.md
Yaojia Wang 4126196dea Add report
2026-02-01 01:49:50 +01:00

667 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Invoice Master POC v2 - 总体架构审查报告
**审查日期**: 2026-02-01
**审查人**: Claude Code
**项目路径**: `/Users/yiukai/Documents/git/invoice-master-poc-v2`
---
## 架构概述
### 整体架构图
```
┌─────────────────────────────────────────────────────────────────┐
│ Frontend (React) │
│ Vite + TypeScript + TailwindCSS │
└─────────────────────────────┬───────────────────────────────────┘
│ HTTP/REST
┌─────────────────────────────▼───────────────────────────────────┐
│ Inference Service (FastAPI) │
│ ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│ │ Public API │ Admin API │ Training API│ Batch API │ │
│ └──────────────┴──────────────┴──────────────┴──────────────┘ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Service Layer │ │
│ │ InferenceService │ AsyncProcessing │ BatchUpload │ Dataset │ │
│ └────────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Data Layer │ │
│ │ AdminDB │ AsyncRequestDB │ SQLModel │ PostgreSQL │ │
│ └────────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Core Components │ │
│ │ RateLimiter │ Schedulers │ TaskQueues │ Auth │ │
│ └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬───────────────────────────────────┘
│ PostgreSQL
┌─────────────────────────────▼───────────────────────────────────┐
│ Training Service (GPU) │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ CLI: train │ autolabel │ analyze │ validate │ │
│ └────────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ YOLO: db_dataset │ annotation_generator │ │
│ └────────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Processing: CPU Pool │ GPU Pool │ Task Dispatcher │ │
│ └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────┴─────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Shared │ │ Storage │
│ PDF │ OCR │ │ Local/Azure/ │
│ Normalize │ │ S3 │
└──────────────┘ └──────────────┘
```
### 技术栈
| 层级 | 技术 | 评估 |
|------|------|------|
| **前端** | React + Vite + TypeScript + TailwindCSS | ✅ 现代栈 |
| **API 框架** | FastAPI | ✅ 高性能,类型安全 |
| **数据库** | PostgreSQL + SQLModel | ✅ 类型安全 ORM |
| **目标检测** | YOLOv11 (Ultralytics) | ✅ 业界标准 |
| **OCR** | PaddleOCR v5 | ✅ 支持瑞典语 |
| **部署** | Docker + Azure/AWS | ✅ 云原生 |
---
## 架构优势
### 1. Monorepo 结构 ✅
```
packages/
├── shared/ # 共享库 - 无外部依赖
├── training/ # 训练服务 - 依赖 shared
└── inference/ # 推理服务 - 依赖 shared
```
**优点**:
- 清晰的包边界,无循环依赖
- 独立部署training 按需启动
- 代码复用率高
### 2. 分层架构 ✅
```
API Routes (web/api/v1/)
Service Layer (web/services/)
Data Layer (data/)
Database (PostgreSQL)
```
**优点**:
- 职责分离明确
- 便于单元测试
- 可替换底层实现
### 3. 依赖注入 ✅
```python
# FastAPI Depends 使用得当
@router.post("/infer")
async def infer(
file: UploadFile,
db: AdminDB = Depends(get_admin_db), # 注入
token: str = Depends(validate_admin_token),
):
```
### 4. 存储抽象层 ✅
```python
# 统一接口,支持多后端
class StorageBackend(ABC):
def upload(self, source: Path, destination: str) -> None: ...
def download(self, source: str, destination: Path) -> None: ...
def get_presigned_url(self, path: str) -> str: ...
# 实现: LocalStorageBackend, AzureStorageBackend, S3StorageBackend
```
### 5. 动态模型管理 ✅
```python
# 数据库驱动的模型切换
def get_active_model_path() -> Path | None:
db = AdminDB()
active_model = db.get_active_model_version()
return active_model.model_path if active_model else None
inference_service = InferenceService(
model_path_resolver=get_active_model_path,
)
```
### 6. 任务队列分离 ✅
```python
# 不同类型任务使用不同队列
- AsyncTaskQueue: 异步推理任务
- BatchQueue: 批量上传任务
- TrainingScheduler: 训练任务调度
- AutoLabelScheduler: 自动标注调度
```
---
## 架构问题与风险
### 1. 数据库层职责过重 ⚠️ **中风险**
**问题**: `AdminDB` 类过大,违反单一职责原则
```python
# packages/inference/inference/data/admin_db.py
class AdminDB:
# Token 管理 (5 个方法)
def is_valid_admin_token(self, token: str) -> bool: ...
def create_admin_token(self, token: str, name: str): ...
# 文档管理 (8 个方法)
def create_document(self, ...): ...
def get_document(self, doc_id: str): ...
# 标注管理 (6 个方法)
def create_annotation(self, ...): ...
def get_annotations(self, doc_id: str): ...
# 训练任务 (7 个方法)
def create_training_task(self, ...): ...
def update_training_task(self, ...): ...
# 数据集 (6 个方法)
def create_dataset(self, ...): ...
def get_dataset(self, dataset_id: str): ...
# 模型版本 (5 个方法)
def create_model_version(self, ...): ...
def activate_model_version(self, ...): ...
# 批处理 (4 个方法)
# 锁管理 (3 个方法)
# ... 总计 50+ 方法
```
**影响**:
- 类过大,难以维护
- 测试困难
- 不同领域变更互相影响
**建议**: 按领域拆分为 Repository 模式
```python
# 建议重构
class TokenRepository:
def validate(self, token: str) -> bool: ...
def create(self, token: Token) -> None: ...
class DocumentRepository:
def find_by_id(self, doc_id: str) -> Document | None: ...
def save(self, document: Document) -> None: ...
class TrainingRepository:
def create_task(self, config: TrainingConfig) -> TrainingTask: ...
def update_task_status(self, task_id: str, status: TaskStatus): ...
class ModelRepository:
def get_active(self) -> ModelVersion | None: ...
def activate(self, version_id: str) -> None: ...
```
---
### 2. Service 层混合业务逻辑与技术细节 ⚠️ **中风险**
**问题**: `InferenceService` 既处理业务逻辑又处理技术实现
```python
# packages/inference/inference/web/services/inference.py
class InferenceService:
def process(self, image_bytes: bytes) -> ServiceResult:
# 1. 技术细节: 图像解码
image = Image.open(io.BytesIO(image_bytes))
# 2. 业务逻辑: 字段提取
fields = self._extract_fields(image)
# 3. 技术细节: 模型推理
detections = self._model.predict(image)
# 4. 业务逻辑: 结果验证
if not self._validate_fields(fields):
raise ValidationError()
```
**影响**:
- 难以测试业务逻辑
- 技术变更影响业务代码
- 无法切换技术实现
**建议**: 引入领域层和适配器模式
```python
# 领域层 - 纯业务逻辑
@dataclass
class InvoiceDocument:
document_id: str
pages: list[Page]
class InvoiceExtractor:
"""纯业务逻辑,不依赖技术实现"""
def extract(self, document: InvoiceDocument) -> InvoiceFields:
# 只处理业务规则
pass
# 适配器层 - 技术实现
class YoloFieldDetector:
"""YOLO 技术适配器"""
def __init__(self, model_path: Path):
self._model = YOLO(model_path)
def detect(self, image: np.ndarray) -> list[FieldRegion]:
return self._model.predict(image)
class PaddleOcrEngine:
"""PaddleOCR 技术适配器"""
def __init__(self):
self._ocr = PaddleOCR()
def recognize(self, image: np.ndarray, region: BoundingBox) -> str:
return self._ocr.ocr(image, region)
# 应用服务 - 协调领域和适配器
class InvoiceProcessingService:
def __init__(
self,
extractor: InvoiceExtractor,
detector: FieldDetector,
ocr: OcrEngine,
):
self._extractor = extractor
self._detector = detector
self._ocr = ocr
```
---
### 3. 调度器设计分散 ⚠️ **中风险**
**问题**: 多个独立调度器缺乏统一协调
```python
# 当前设计 - 4 个独立调度器
# 1. TrainingScheduler (core/scheduler.py)
# 2. AutoLabelScheduler (core/autolabel_scheduler.py)
# 3. AsyncTaskQueue (workers/async_queue.py)
# 4. BatchQueue (workers/batch_queue.py)
# app.py 中分别启动
start_scheduler() # 训练调度器
start_autolabel_scheduler() # 自动标注调度器
init_batch_queue() # 批处理队列
```
**影响**:
- 资源竞争风险
- 难以监控和追踪
- 任务优先级难以管理
- 重启时任务丢失
**建议**: 使用 Celery + Redis 统一任务队列
```python
# 建议重构
from celery import Celery
app = Celery('invoice_master')
@app.task(bind=True, max_retries=3)
def process_inference(self, document_id: str):
"""异步推理任务"""
try:
service = get_inference_service()
result = service.process(document_id)
return result
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
@app.task
def train_model(dataset_id: str, config: dict):
"""训练任务"""
training_service = get_training_service()
return training_service.train(dataset_id, config)
@app.task
def auto_label_documents(document_ids: list[str]):
"""批量自动标注"""
for doc_id in document_ids:
auto_label_document.delay(doc_id)
# 优先级队列
app.conf.task_routes = {
'tasks.process_inference': {'queue': 'high_priority'},
'tasks.train_model': {'queue': 'gpu_queue'},
'tasks.auto_label_documents': {'queue': 'low_priority'},
}
```
---
### 4. 配置分散 ⚠️ **低风险**
**问题**: 配置分散在多个文件
```python
# packages/shared/shared/config.py
DATABASE = {...}
PATHS = {...}
AUTOLABEL = {...}
# packages/inference/inference/web/config.py
@dataclass
class ModelConfig: ...
@dataclass
class ServerConfig: ...
@dataclass
class FileConfig: ...
# 环境变量
# .env 文件
```
**影响**:
- 配置难以追踪
- 可能出现不一致
- 缺少配置验证
**建议**: 使用 Pydantic Settings 集中管理
```python
# config/settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class DatabaseSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix='DB_')
host: str = 'localhost'
port: int = 5432
name: str = 'docmaster'
user: str = 'docmaster'
password: str # 无默认值,必须设置
class StorageSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix='STORAGE_')
backend: str = 'local'
base_path: str = '~/invoice-data'
azure_connection_string: str | None = None
s3_bucket: str | None = None
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
)
database: DatabaseSettings = DatabaseSettings()
storage: StorageSettings = StorageSettings()
# 验证
@field_validator('database')
def validate_database(cls, v):
if not v.password:
raise ValueError('Database password is required')
return v
# 全局配置实例
settings = Settings()
```
---
### 5. 内存队列单点故障 ⚠️ **中风险**
**问题**: AsyncTaskQueue 和 BatchQueue 基于内存
```python
# workers/async_queue.py
class AsyncTaskQueue:
def __init__(self):
self._queue = Queue() # 内存队列
self._workers = []
def enqueue(self, task: AsyncTask) -> None:
self._queue.put(task) # 仅存储在内存
```
**影响**:
- 服务重启丢失所有待处理任务
- 无法水平扩展
- 任务持久化困难
**建议**: 使用 Redis/RabbitMQ 持久化队列
---
### 6. 缺少 API 版本迁移策略 ❓ **低风险**
**问题**: 有 `/api/v1/` 版本,但缺少升级策略
```
当前: /api/v1/admin/documents
未来: /api/v2/admin/documents ?
```
**建议**:
- 制定 API 版本升级流程
- 使用 Header 版本控制
- 维护版本兼容性文档
---
## 关键架构风险矩阵
| 风险项 | 概率 | 影响 | 风险等级 | 优先级 |
|--------|------|------|----------|--------|
| 内存队列丢失任务 | 中 | 高 | **高** | 🔴 P0 |
| AdminDB 职责过重 | 高 | 中 | **中** | 🟡 P1 |
| Service 层混合 | 高 | 中 | **中** | 🟡 P1 |
| 调度器资源竞争 | 中 | 中 | **中** | 🟡 P1 |
| 配置分散 | 高 | 低 | **低** | 🟢 P2 |
| API 版本策略 | 低 | 低 | **低** | 🟢 P2 |
---
## 改进建议路线图
### Phase 1: 立即执行 (本周)
#### 1.1 拆分 AdminDB
```python
# 创建 repositories 包
inference/data/repositories/
├── __init__.py
├── base.py # Repository 基类
├── token.py # TokenRepository
├── document.py # DocumentRepository
├── annotation.py # AnnotationRepository
├── training.py # TrainingRepository
├── dataset.py # DatasetRepository
└── model.py # ModelRepository
```
#### 1.2 统一配置
```python
# 创建统一配置模块
inference/config/
├── __init__.py
├── settings.py # Pydantic Settings
└── validators.py # 配置验证
```
### Phase 2: 短期执行 (本月)
#### 2.1 引入消息队列
```yaml
# docker-compose.yml 添加
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
celery_worker:
build: .
command: celery -A inference.tasks worker -l info
depends_on:
- redis
- postgres
```
#### 2.2 添加缓存层
```python
# 使用 Redis 缓存热点数据
from redis import Redis
redis_client = Redis(host='localhost', port=6379)
class CachedDocumentRepository(DocumentRepository):
def find_by_id(self, doc_id: str) -> Document | None:
# 先查缓存
cached = redis_client.get(f"doc:{doc_id}")
if cached:
return Document.parse_raw(cached)
# 再查数据库
doc = super().find_by_id(doc_id)
if doc:
redis_client.setex(f"doc:{doc_id}", 3600, doc.json())
return doc
```
### Phase 3: 长期执行 (本季度)
#### 3.1 数据库读写分离
```python
# 配置主从数据库
class DatabaseManager:
def __init__(self):
self._master = create_engine(MASTER_DB_URL)
self._replica = create_engine(REPLICA_DB_URL)
def get_session(self, readonly: bool = False) -> Session:
engine = self._replica if readonly else self._master
return Session(engine)
```
#### 3.2 事件驱动架构
```python
# 引入事件总线
from event_bus import EventBus
bus = EventBus()
# 发布事件
@router.post("/documents")
async def create_document(...):
doc = document_repo.save(document)
bus.publish('document.created', {'document_id': doc.id})
return doc
# 订阅事件
@bus.subscribe('document.created')
def on_document_created(event):
# 触发自动标注
auto_label_task.delay(event['document_id'])
```
---
## 架构演进建议
### 当前架构 (适合 1-10 用户)
```
Single Instance
├── FastAPI App
├── Memory Queues
└── PostgreSQL
```
### 目标架构 (适合 100+ 用户)
```
Load Balancer
├── FastAPI Instance 1
├── FastAPI Instance 2
└── FastAPI Instance N
┌───────┴───────┐
▼ ▼
Redis Cluster PostgreSQL
(Celery + Cache) (Master + Replica)
```
---
## 总结
### 总体评分
| 维度 | 评分 | 说明 |
|------|------|------|
| **模块化** | 8/10 | 包结构清晰,但部分类过大 |
| **可扩展性** | 7/10 | 水平扩展良好,垂直扩展受限 |
| **可维护性** | 8/10 | 分层合理,但职责边界需细化 |
| **可靠性** | 7/10 | 内存队列是单点故障 |
| **性能** | 8/10 | 异步处理良好 |
| **安全性** | 8/10 | 基础安全到位 |
| **总体** | **7.7/10** | 良好的架构基础,需优化细节 |
### 关键结论
1. **架构设计合理**: Monorepo + 分层架构适合当前规模
2. **主要风险**: 内存队列和数据库职责过重
3. **演进路径**: 引入消息队列和缓存层
4. **投入产出**: 当前架构可支撑到 100+ 用户,无需大规模重构
### 下一步行动
| 优先级 | 任务 | 预计工时 | 影响 |
|--------|------|----------|------|
| 🔴 P0 | 引入 Celery + Redis | 3 天 | 解决任务丢失问题 |
| 🟡 P1 | 拆分 AdminDB | 2 天 | 提升可维护性 |
| 🟡 P1 | 统一配置管理 | 1 天 | 减少配置错误 |
| 🟢 P2 | 添加缓存层 | 2 天 | 提升性能 |
| 🟢 P2 | 数据库读写分离 | 3 天 | 提升扩展性 |
---
## 附录
### 关键文件清单
| 文件 | 职责 | 问题 |
|------|------|------|
| `inference/data/admin_db.py` | 数据库操作 | 类过大,需拆分 |
| `inference/web/services/inference.py` | 推理服务 | 混合业务和技术 |
| `inference/web/workers/async_queue.py` | 异步队列 | 内存存储,易丢失 |
| `inference/web/core/scheduler.py` | 任务调度 | 缺少统一协调 |
| `shared/shared/config.py` | 共享配置 | 分散管理 |
### 参考资源
- [Repository Pattern](https://martinfowler.com/eaaCatalog/repository.html)
- [Celery Documentation](https://docs.celeryproject.org/)
- [Pydantic Settings](https://docs.pydantic.dev/latest/concepts/pydantic_settings/)
- [FastAPI Best Practices](https://fastapi.tiangolo.com/tutorial/bigger-applications/)