Files
invoice-master-poc-v2/docs/multi_pool_design.md
Yaojia Wang b26fd61852 WOP
2026-01-13 00:10:27 +01:00

620 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 多池处理架构设计文档
## 1. 研究总结
### 1.1 当前问题分析
我们之前实现的双池模式存在稳定性问题,主要原因:
| 问题 | 原因 | 解决方案 |
|------|------|----------|
| 处理卡住 | 线程 + ProcessPoolExecutor 混用导致死锁 | 使用 asyncio 或纯 Queue 模式 |
| Queue.get() 无限阻塞 | 没有超时机制 | 添加 timeout 和哨兵值 |
| GPU 内存冲突 | 多进程同时访问 GPU | 限制 GPU worker = 1 |
| CUDA fork 问题 | Linux 默认 fork 不兼容 CUDA | 使用 spawn 启动方式 |
### 1.2 推荐架构方案
经过研究,最适合我们场景的方案是 **生产者-消费者队列模式**
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Main Process │ │ CPU Workers │ │ GPU Worker │
│ │ │ (4 processes) │ │ (1 process) │
│ ┌───────────┐ │ │ │ │ │
│ │ Task │──┼────▶│ Text PDF处理 │ │ Scanned PDF处理 │
│ │ Dispatcher│ │ │ (无需OCR) │ │ (PaddleOCR) │
│ └───────────┘ │ │ │ │ │
│ ▲ │ │ │ │ │ │ │
│ │ │ │ ▼ │ │ ▼ │
│ ┌───────────┐ │ │ Result Queue │ │ Result Queue │
│ │ Result │◀─┼─────│◀────────────────│─────│◀────────────────│
│ │ Collector │ │ │ │ │ │
│ └───────────┘ │ └─────────────────┘ └─────────────────┘
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ Database │ │
│ │ Batch │ │
│ │ Writer │ │
│ └───────────┘ │
└─────────────────┘
```
---
## 2. 核心设计原则
### 2.1 CUDA 兼容性
```python
# 关键:使用 spawn 启动方式
import multiprocessing as mp
ctx = mp.get_context("spawn")
# GPU worker 初始化时设置设备
def init_gpu_worker(gpu_id: int = 0):
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
global _ocr
from paddleocr import PaddleOCR
_ocr = PaddleOCR(use_gpu=True, ...)
```
### 2.2 Worker 初始化模式
使用 `initializer` 参数一次性加载模型,避免每个任务重新加载:
```python
# 全局变量保存模型
_ocr = None
def init_worker(use_gpu: bool, gpu_id: int = 0):
global _ocr
if use_gpu:
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
else:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
from paddleocr import PaddleOCR
_ocr = PaddleOCR(use_gpu=use_gpu, ...)
# 创建 Pool 时使用 initializer
pool = ProcessPoolExecutor(
max_workers=1,
initializer=init_worker,
initargs=(True, 0), # use_gpu=True, gpu_id=0
mp_context=mp.get_context("spawn")
)
```
### 2.3 队列模式 vs as_completed
| 方式 | 优点 | 缺点 | 适用场景 |
|------|------|------|----------|
| `as_completed()` | 简单、无需管理队列 | 无法跨多个 Pool 使用 | 单池场景 |
| `multiprocessing.Queue` | 高性能、灵活 | 需要手动管理、死锁风险 | 多池流水线 |
| `Manager().Queue()` | 可 pickle、跨 Pool | 性能较低 | 需要 Pool.map 场景 |
**推荐**:对于双池场景,使用 `as_completed()` 分别处理每个池,然后合并结果。
---
## 3. 详细开发计划
### 阶段 1重构基础架构 (2-3天)
#### 1.1 创建 WorkerPool 抽象类
```python
# src/processing/worker_pool.py
from __future__ import annotations
from abc import ABC, abstractmethod
from concurrent.futures import ProcessPoolExecutor, Future
from dataclasses import dataclass
from typing import List, Any, Optional, Callable
import multiprocessing as mp
@dataclass
class TaskResult:
"""任务结果容器"""
task_id: str
success: bool
data: Any
error: Optional[str] = None
processing_time: float = 0.0
class WorkerPool(ABC):
"""Worker Pool 抽象基类"""
def __init__(self, max_workers: int, use_gpu: bool = False, gpu_id: int = 0):
self.max_workers = max_workers
self.use_gpu = use_gpu
self.gpu_id = gpu_id
self._executor: Optional[ProcessPoolExecutor] = None
@abstractmethod
def get_initializer(self) -> Callable:
"""返回 worker 初始化函数"""
pass
@abstractmethod
def get_init_args(self) -> tuple:
"""返回初始化参数"""
pass
def start(self):
"""启动 worker pool"""
ctx = mp.get_context("spawn")
self._executor = ProcessPoolExecutor(
max_workers=self.max_workers,
mp_context=ctx,
initializer=self.get_initializer(),
initargs=self.get_init_args()
)
def submit(self, fn: Callable, *args, **kwargs) -> Future:
"""提交任务"""
if not self._executor:
raise RuntimeError("Pool not started")
return self._executor.submit(fn, *args, **kwargs)
def shutdown(self, wait: bool = True):
"""关闭 pool"""
if self._executor:
self._executor.shutdown(wait=wait)
self._executor = None
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.shutdown()
```
#### 1.2 实现 CPU 和 GPU Worker Pool
```python
# src/processing/cpu_pool.py
class CPUWorkerPool(WorkerPool):
"""CPU-only worker pool for text PDF processing"""
def __init__(self, max_workers: int = 4):
super().__init__(max_workers=max_workers, use_gpu=False)
def get_initializer(self) -> Callable:
return init_cpu_worker
def get_init_args(self) -> tuple:
return ()
# src/processing/gpu_pool.py
class GPUWorkerPool(WorkerPool):
"""GPU worker pool for OCR processing"""
def __init__(self, max_workers: int = 1, gpu_id: int = 0):
super().__init__(max_workers=max_workers, use_gpu=True, gpu_id=gpu_id)
def get_initializer(self) -> Callable:
return init_gpu_worker
def get_init_args(self) -> tuple:
return (self.gpu_id,)
```
---
### 阶段 2实现双池协调器 (2-3天)
#### 2.1 任务分发器
```python
# src/processing/task_dispatcher.py
from dataclasses import dataclass
from enum import Enum, auto
from typing import List, Tuple
class TaskType(Enum):
CPU = auto() # Text PDF
GPU = auto() # Scanned PDF
@dataclass
class Task:
id: str
task_type: TaskType
data: Any
class TaskDispatcher:
"""根据 PDF 类型分发任务到不同的 pool"""
def classify_task(self, doc_info: dict) -> TaskType:
"""判断文档是否需要 OCR"""
# 基于 PDF 特征判断
if self._is_scanned_pdf(doc_info):
return TaskType.GPU
return TaskType.CPU
def _is_scanned_pdf(self, doc_info: dict) -> bool:
"""检测是否为扫描件"""
# 1. 检查是否有可提取文本
# 2. 检查图片比例
# 3. 检查文本密度
pass
def partition_tasks(self, tasks: List[Task]) -> Tuple[List[Task], List[Task]]:
"""将任务分为 CPU 和 GPU 两组"""
cpu_tasks = [t for t in tasks if t.task_type == TaskType.CPU]
gpu_tasks = [t for t in tasks if t.task_type == TaskType.GPU]
return cpu_tasks, gpu_tasks
```
#### 2.2 双池协调器
```python
# src/processing/dual_pool_coordinator.py
from concurrent.futures import as_completed
from typing import List, Iterator
import logging
logger = logging.getLogger(__name__)
class DualPoolCoordinator:
"""协调 CPU 和 GPU 两个 worker pool"""
def __init__(
self,
cpu_workers: int = 4,
gpu_workers: int = 1,
gpu_id: int = 0
):
self.cpu_pool = CPUWorkerPool(max_workers=cpu_workers)
self.gpu_pool = GPUWorkerPool(max_workers=gpu_workers, gpu_id=gpu_id)
self.dispatcher = TaskDispatcher()
def __enter__(self):
self.cpu_pool.start()
self.gpu_pool.start()
return self
def __exit__(self, *args):
self.cpu_pool.shutdown()
self.gpu_pool.shutdown()
def process_batch(
self,
documents: List[dict],
cpu_task_fn: Callable,
gpu_task_fn: Callable,
on_result: Optional[Callable[[TaskResult], None]] = None,
on_error: Optional[Callable[[str, Exception], None]] = None
) -> List[TaskResult]:
"""
处理一批文档,自动分发到 CPU 或 GPU pool
Args:
documents: 待处理文档列表
cpu_task_fn: CPU 任务处理函数
gpu_task_fn: GPU 任务处理函数
on_result: 结果回调(可选)
on_error: 错误回调(可选)
Returns:
所有任务结果列表
"""
# 分类任务
tasks = [
Task(id=doc['id'], task_type=self.dispatcher.classify_task(doc), data=doc)
for doc in documents
]
cpu_tasks, gpu_tasks = self.dispatcher.partition_tasks(tasks)
logger.info(f"Task partition: {len(cpu_tasks)} CPU, {len(gpu_tasks)} GPU")
# 提交任务到各自的 pool
cpu_futures = {
self.cpu_pool.submit(cpu_task_fn, t.data): t.id
for t in cpu_tasks
}
gpu_futures = {
self.gpu_pool.submit(gpu_task_fn, t.data): t.id
for t in gpu_tasks
}
# 收集结果
results = []
all_futures = list(cpu_futures.keys()) + list(gpu_futures.keys())
for future in as_completed(all_futures):
task_id = cpu_futures.get(future) or gpu_futures.get(future)
pool_type = "CPU" if future in cpu_futures else "GPU"
try:
data = future.result(timeout=300) # 5分钟超时
result = TaskResult(task_id=task_id, success=True, data=data)
if on_result:
on_result(result)
except Exception as e:
logger.error(f"[{pool_type}] Task {task_id} failed: {e}")
result = TaskResult(task_id=task_id, success=False, data=None, error=str(e))
if on_error:
on_error(task_id, e)
results.append(result)
return results
```
---
### 阶段 3集成到 autolabel (1-2天)
#### 3.1 修改 autolabel.py
```python
# src/cli/autolabel.py
def run_autolabel_dual_pool(args):
"""使用双池模式运行自动标注"""
from src.processing.dual_pool_coordinator import DualPoolCoordinator
# 初始化数据库批处理
db_batch = []
db_batch_size = 100
def on_result(result: TaskResult):
"""处理成功结果"""
nonlocal db_batch
db_batch.append(result.data)
if len(db_batch) >= db_batch_size:
save_documents_batch(db_batch)
db_batch.clear()
def on_error(task_id: str, error: Exception):
"""处理错误"""
logger.error(f"Task {task_id} failed: {error}")
# 创建双池协调器
with DualPoolCoordinator(
cpu_workers=args.cpu_workers or 4,
gpu_workers=args.gpu_workers or 1,
gpu_id=0
) as coordinator:
# 处理所有 CSV
for csv_file in csv_files:
documents = load_documents_from_csv(csv_file)
results = coordinator.process_batch(
documents=documents,
cpu_task_fn=process_text_pdf,
gpu_task_fn=process_scanned_pdf,
on_result=on_result,
on_error=on_error
)
logger.info(f"CSV {csv_file}: {len(results)} processed")
# 保存剩余批次
if db_batch:
save_documents_batch(db_batch)
```
---
### 阶段 4测试与验证 (1-2天)
#### 4.1 单元测试
```python
# tests/unit/test_dual_pool.py
import pytest
from src.processing.dual_pool_coordinator import DualPoolCoordinator, TaskResult
class TestDualPoolCoordinator:
def test_cpu_only_batch(self):
"""测试纯 CPU 任务批处理"""
with DualPoolCoordinator(cpu_workers=2, gpu_workers=1) as coord:
docs = [{"id": f"doc_{i}", "type": "text"} for i in range(10)]
results = coord.process_batch(docs, cpu_fn, gpu_fn)
assert len(results) == 10
assert all(r.success for r in results)
def test_mixed_batch(self):
"""测试混合任务批处理"""
with DualPoolCoordinator(cpu_workers=2, gpu_workers=1) as coord:
docs = [
{"id": "text_1", "type": "text"},
{"id": "scan_1", "type": "scanned"},
{"id": "text_2", "type": "text"},
]
results = coord.process_batch(docs, cpu_fn, gpu_fn)
assert len(results) == 3
def test_timeout_handling(self):
"""测试超时处理"""
pass
def test_error_recovery(self):
"""测试错误恢复"""
pass
```
#### 4.2 集成测试
```python
# tests/integration/test_autolabel_dual_pool.py
def test_autolabel_with_dual_pool():
"""端到端测试双池模式"""
# 使用少量测试数据
result = subprocess.run([
"python", "-m", "src.cli.autolabel",
"--cpu-workers", "2",
"--gpu-workers", "1",
"--limit", "50"
], capture_output=True)
assert result.returncode == 0
# 验证数据库记录
```
---
## 4. 关键技术点
### 4.1 避免死锁的策略
```python
# 1. 使用 timeout
try:
result = future.result(timeout=300)
except TimeoutError:
logger.warning(f"Task timed out")
# 2. 使用哨兵值
SENTINEL = object()
queue.put(SENTINEL) # 发送结束信号
# 3. 检查进程状态
if not worker.is_alive():
logger.error("Worker died unexpectedly")
break
# 4. 先清空队列再 join
while not queue.empty():
results.append(queue.get_nowait())
worker.join(timeout=5.0)
```
### 4.2 PaddleOCR 特殊处理
```python
# PaddleOCR 必须在 worker 进程中初始化
def init_paddle_worker(gpu_id: int):
global _ocr
import os
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
# 延迟导入,确保 CUDA 环境变量生效
from paddleocr import PaddleOCR
_ocr = PaddleOCR(
use_angle_cls=True,
lang='en',
use_gpu=True,
show_log=False,
# 重要:设置 GPU 内存比例
gpu_mem=2000 # 限制 GPU 内存使用 (MB)
)
```
### 4.3 资源监控
```python
import psutil
import GPUtil
def get_resource_usage():
"""获取系统资源使用情况"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
gpu_info = []
for gpu in GPUtil.getGPUs():
gpu_info.append({
"id": gpu.id,
"memory_used": gpu.memoryUsed,
"memory_total": gpu.memoryTotal,
"utilization": gpu.load * 100
})
return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"gpu": gpu_info
}
```
---
## 5. 风险评估与应对
| 风险 | 可能性 | 影响 | 应对策略 |
|------|--------|------|----------|
| GPU 内存不足 | 中 | 高 | 限制 GPU worker = 1设置 gpu_mem 参数 |
| 进程僵死 | 低 | 高 | 添加心跳检测,超时自动重启 |
| 任务分类错误 | 中 | 中 | 添加回退机制CPU 失败后尝试 GPU |
| 数据库写入瓶颈 | 低 | 中 | 增大批处理大小,异步写入 |
---
## 6. 备选方案
如果上述方案仍存在问题,可以考虑:
### 6.1 使用 Ray
```python
import ray
ray.init()
@ray.remote(num_cpus=1)
def cpu_task(data):
return process_text_pdf(data)
@ray.remote(num_gpus=1)
def gpu_task(data):
return process_scanned_pdf(data)
# 自动资源调度
futures = [cpu_task.remote(d) for d in cpu_docs]
futures += [gpu_task.remote(d) for d in gpu_docs]
results = ray.get(futures)
```
### 6.2 单池 + 动态 GPU 调度
保持单池模式,但在每个任务内部动态决定是否使用 GPU
```python
def process_document(doc_data):
if is_scanned_pdf(doc_data):
# 使用 GPU (需要全局锁或信号量控制并发)
with gpu_semaphore:
return process_with_ocr(doc_data)
else:
return process_text_only(doc_data)
```
---
## 7. 时间线总结
| 阶段 | 任务 | 预计工作量 |
|------|------|------------|
| 阶段 1 | 基础架构重构 | 2-3 天 |
| 阶段 2 | 双池协调器实现 | 2-3 天 |
| 阶段 3 | 集成到 autolabel | 1-2 天 |
| 阶段 4 | 测试与验证 | 1-2 天 |
| **总计** | | **6-10 天** |
---
## 8. 参考资料
1. [Python concurrent.futures 官方文档](https://docs.python.org/3/library/concurrent.futures.html)
2. [PyTorch Multiprocessing Best Practices](https://docs.pytorch.org/docs/stable/notes/multiprocessing.html)
3. [Super Fast Python - ProcessPoolExecutor 完整指南](https://superfastpython.com/processpoolexecutor-in-python/)
4. [PaddleOCR 并行推理文档](http://www.paddleocr.ai/main/en/version3.x/pipeline_usage/instructions/parallel_inference.html)
5. [AWS - 跨 CPU/GPU 并行化 ML 推理](https://aws.amazon.com/blogs/machine-learning/parallelizing-across-multiple-cpu-gpus-to-speed-up-deep-learning-inference-at-the-edge/)
6. [Ray 分布式多进程处理](https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html)