620 lines
18 KiB
Markdown
620 lines
18 KiB
Markdown
# 多池处理架构设计文档
|
||
|
||
## 1. 研究总结
|
||
|
||
### 1.1 当前问题分析
|
||
|
||
我们之前实现的双池模式存在稳定性问题,主要原因:
|
||
|
||
| 问题 | 原因 | 解决方案 |
|
||
|------|------|----------|
|
||
| 处理卡住 | 线程 + ProcessPoolExecutor 混用导致死锁 | 使用 asyncio 或纯 Queue 模式 |
|
||
| Queue.get() 无限阻塞 | 没有超时机制 | 添加 timeout 和哨兵值 |
|
||
| GPU 内存冲突 | 多进程同时访问 GPU | 限制 GPU worker = 1 |
|
||
| CUDA fork 问题 | Linux 默认 fork 不兼容 CUDA | 使用 spawn 启动方式 |
|
||
|
||
### 1.2 推荐架构方案
|
||
|
||
经过研究,最适合我们场景的方案是 **生产者-消费者队列模式**:
|
||
|
||
```
|
||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||
│ Main Process │ │ CPU Workers │ │ GPU Worker │
|
||
│ │ │ (4 processes) │ │ (1 process) │
|
||
│ ┌───────────┐ │ │ │ │ │
|
||
│ │ Task │──┼────▶│ Text PDF处理 │ │ Scanned PDF处理 │
|
||
│ │ Dispatcher│ │ │ (无需OCR) │ │ (PaddleOCR) │
|
||
│ └───────────┘ │ │ │ │ │
|
||
│ ▲ │ │ │ │ │ │ │
|
||
│ │ │ │ ▼ │ │ ▼ │
|
||
│ ┌───────────┐ │ │ Result Queue │ │ Result Queue │
|
||
│ │ Result │◀─┼─────│◀────────────────│─────│◀────────────────│
|
||
│ │ Collector │ │ │ │ │ │
|
||
│ └───────────┘ │ └─────────────────┘ └─────────────────┘
|
||
│ │ │
|
||
│ ▼ │
|
||
│ ┌───────────┐ │
|
||
│ │ Database │ │
|
||
│ │ Batch │ │
|
||
│ │ Writer │ │
|
||
│ └───────────┘ │
|
||
└─────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 2. 核心设计原则
|
||
|
||
### 2.1 CUDA 兼容性
|
||
|
||
```python
|
||
# 关键:使用 spawn 启动方式
|
||
import multiprocessing as mp
|
||
ctx = mp.get_context("spawn")
|
||
|
||
# GPU worker 初始化时设置设备
|
||
def init_gpu_worker(gpu_id: int = 0):
|
||
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
|
||
global _ocr
|
||
from paddleocr import PaddleOCR
|
||
_ocr = PaddleOCR(use_gpu=True, ...)
|
||
```
|
||
|
||
### 2.2 Worker 初始化模式
|
||
|
||
使用 `initializer` 参数一次性加载模型,避免每个任务重新加载:
|
||
|
||
```python
|
||
# 全局变量保存模型
|
||
_ocr = None
|
||
|
||
def init_worker(use_gpu: bool, gpu_id: int = 0):
|
||
global _ocr
|
||
if use_gpu:
|
||
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
|
||
else:
|
||
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
|
||
|
||
from paddleocr import PaddleOCR
|
||
_ocr = PaddleOCR(use_gpu=use_gpu, ...)
|
||
|
||
# 创建 Pool 时使用 initializer
|
||
pool = ProcessPoolExecutor(
|
||
max_workers=1,
|
||
initializer=init_worker,
|
||
initargs=(True, 0), # use_gpu=True, gpu_id=0
|
||
mp_context=mp.get_context("spawn")
|
||
)
|
||
```
|
||
|
||
### 2.3 队列模式 vs as_completed
|
||
|
||
| 方式 | 优点 | 缺点 | 适用场景 |
|
||
|------|------|------|----------|
|
||
| `as_completed()` | 简单、无需管理队列 | 无法跨多个 Pool 使用 | 单池场景 |
|
||
| `multiprocessing.Queue` | 高性能、灵活 | 需要手动管理、死锁风险 | 多池流水线 |
|
||
| `Manager().Queue()` | 可 pickle、跨 Pool | 性能较低 | 需要 Pool.map 场景 |
|
||
|
||
**推荐**:对于双池场景,使用 `as_completed()` 分别处理每个池,然后合并结果。
|
||
|
||
---
|
||
|
||
## 3. 详细开发计划
|
||
|
||
### 阶段 1:重构基础架构 (2-3天)
|
||
|
||
#### 1.1 创建 WorkerPool 抽象类
|
||
|
||
```python
|
||
# src/processing/worker_pool.py
|
||
|
||
from __future__ import annotations
|
||
from abc import ABC, abstractmethod
|
||
from concurrent.futures import ProcessPoolExecutor, Future
|
||
from dataclasses import dataclass
|
||
from typing import List, Any, Optional, Callable
|
||
import multiprocessing as mp
|
||
|
||
@dataclass
|
||
class TaskResult:
|
||
"""任务结果容器"""
|
||
task_id: str
|
||
success: bool
|
||
data: Any
|
||
error: Optional[str] = None
|
||
processing_time: float = 0.0
|
||
|
||
class WorkerPool(ABC):
|
||
"""Worker Pool 抽象基类"""
|
||
|
||
def __init__(self, max_workers: int, use_gpu: bool = False, gpu_id: int = 0):
|
||
self.max_workers = max_workers
|
||
self.use_gpu = use_gpu
|
||
self.gpu_id = gpu_id
|
||
self._executor: Optional[ProcessPoolExecutor] = None
|
||
|
||
@abstractmethod
|
||
def get_initializer(self) -> Callable:
|
||
"""返回 worker 初始化函数"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_init_args(self) -> tuple:
|
||
"""返回初始化参数"""
|
||
pass
|
||
|
||
def start(self):
|
||
"""启动 worker pool"""
|
||
ctx = mp.get_context("spawn")
|
||
self._executor = ProcessPoolExecutor(
|
||
max_workers=self.max_workers,
|
||
mp_context=ctx,
|
||
initializer=self.get_initializer(),
|
||
initargs=self.get_init_args()
|
||
)
|
||
|
||
def submit(self, fn: Callable, *args, **kwargs) -> Future:
|
||
"""提交任务"""
|
||
if not self._executor:
|
||
raise RuntimeError("Pool not started")
|
||
return self._executor.submit(fn, *args, **kwargs)
|
||
|
||
def shutdown(self, wait: bool = True):
|
||
"""关闭 pool"""
|
||
if self._executor:
|
||
self._executor.shutdown(wait=wait)
|
||
self._executor = None
|
||
|
||
def __enter__(self):
|
||
self.start()
|
||
return self
|
||
|
||
def __exit__(self, *args):
|
||
self.shutdown()
|
||
```
|
||
|
||
#### 1.2 实现 CPU 和 GPU Worker Pool
|
||
|
||
```python
|
||
# src/processing/cpu_pool.py
|
||
|
||
class CPUWorkerPool(WorkerPool):
|
||
"""CPU-only worker pool for text PDF processing"""
|
||
|
||
def __init__(self, max_workers: int = 4):
|
||
super().__init__(max_workers=max_workers, use_gpu=False)
|
||
|
||
def get_initializer(self) -> Callable:
|
||
return init_cpu_worker
|
||
|
||
def get_init_args(self) -> tuple:
|
||
return ()
|
||
|
||
# src/processing/gpu_pool.py
|
||
|
||
class GPUWorkerPool(WorkerPool):
|
||
"""GPU worker pool for OCR processing"""
|
||
|
||
def __init__(self, max_workers: int = 1, gpu_id: int = 0):
|
||
super().__init__(max_workers=max_workers, use_gpu=True, gpu_id=gpu_id)
|
||
|
||
def get_initializer(self) -> Callable:
|
||
return init_gpu_worker
|
||
|
||
def get_init_args(self) -> tuple:
|
||
return (self.gpu_id,)
|
||
```
|
||
|
||
---
|
||
|
||
### 阶段 2:实现双池协调器 (2-3天)
|
||
|
||
#### 2.1 任务分发器
|
||
|
||
```python
|
||
# src/processing/task_dispatcher.py
|
||
|
||
from dataclasses import dataclass
|
||
from enum import Enum, auto
|
||
from typing import List, Tuple
|
||
|
||
class TaskType(Enum):
|
||
CPU = auto() # Text PDF
|
||
GPU = auto() # Scanned PDF
|
||
|
||
@dataclass
|
||
class Task:
|
||
id: str
|
||
task_type: TaskType
|
||
data: Any
|
||
|
||
class TaskDispatcher:
|
||
"""根据 PDF 类型分发任务到不同的 pool"""
|
||
|
||
def classify_task(self, doc_info: dict) -> TaskType:
|
||
"""判断文档是否需要 OCR"""
|
||
# 基于 PDF 特征判断
|
||
if self._is_scanned_pdf(doc_info):
|
||
return TaskType.GPU
|
||
return TaskType.CPU
|
||
|
||
def _is_scanned_pdf(self, doc_info: dict) -> bool:
|
||
"""检测是否为扫描件"""
|
||
# 1. 检查是否有可提取文本
|
||
# 2. 检查图片比例
|
||
# 3. 检查文本密度
|
||
pass
|
||
|
||
def partition_tasks(self, tasks: List[Task]) -> Tuple[List[Task], List[Task]]:
|
||
"""将任务分为 CPU 和 GPU 两组"""
|
||
cpu_tasks = [t for t in tasks if t.task_type == TaskType.CPU]
|
||
gpu_tasks = [t for t in tasks if t.task_type == TaskType.GPU]
|
||
return cpu_tasks, gpu_tasks
|
||
```
|
||
|
||
#### 2.2 双池协调器
|
||
|
||
```python
|
||
# src/processing/dual_pool_coordinator.py
|
||
|
||
from concurrent.futures import as_completed
|
||
from typing import List, Iterator
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class DualPoolCoordinator:
|
||
"""协调 CPU 和 GPU 两个 worker pool"""
|
||
|
||
def __init__(
|
||
self,
|
||
cpu_workers: int = 4,
|
||
gpu_workers: int = 1,
|
||
gpu_id: int = 0
|
||
):
|
||
self.cpu_pool = CPUWorkerPool(max_workers=cpu_workers)
|
||
self.gpu_pool = GPUWorkerPool(max_workers=gpu_workers, gpu_id=gpu_id)
|
||
self.dispatcher = TaskDispatcher()
|
||
|
||
def __enter__(self):
|
||
self.cpu_pool.start()
|
||
self.gpu_pool.start()
|
||
return self
|
||
|
||
def __exit__(self, *args):
|
||
self.cpu_pool.shutdown()
|
||
self.gpu_pool.shutdown()
|
||
|
||
def process_batch(
|
||
self,
|
||
documents: List[dict],
|
||
cpu_task_fn: Callable,
|
||
gpu_task_fn: Callable,
|
||
on_result: Optional[Callable[[TaskResult], None]] = None,
|
||
on_error: Optional[Callable[[str, Exception], None]] = None
|
||
) -> List[TaskResult]:
|
||
"""
|
||
处理一批文档,自动分发到 CPU 或 GPU pool
|
||
|
||
Args:
|
||
documents: 待处理文档列表
|
||
cpu_task_fn: CPU 任务处理函数
|
||
gpu_task_fn: GPU 任务处理函数
|
||
on_result: 结果回调(可选)
|
||
on_error: 错误回调(可选)
|
||
|
||
Returns:
|
||
所有任务结果列表
|
||
"""
|
||
# 分类任务
|
||
tasks = [
|
||
Task(id=doc['id'], task_type=self.dispatcher.classify_task(doc), data=doc)
|
||
for doc in documents
|
||
]
|
||
cpu_tasks, gpu_tasks = self.dispatcher.partition_tasks(tasks)
|
||
|
||
logger.info(f"Task partition: {len(cpu_tasks)} CPU, {len(gpu_tasks)} GPU")
|
||
|
||
# 提交任务到各自的 pool
|
||
cpu_futures = {
|
||
self.cpu_pool.submit(cpu_task_fn, t.data): t.id
|
||
for t in cpu_tasks
|
||
}
|
||
gpu_futures = {
|
||
self.gpu_pool.submit(gpu_task_fn, t.data): t.id
|
||
for t in gpu_tasks
|
||
}
|
||
|
||
# 收集结果
|
||
results = []
|
||
all_futures = list(cpu_futures.keys()) + list(gpu_futures.keys())
|
||
|
||
for future in as_completed(all_futures):
|
||
task_id = cpu_futures.get(future) or gpu_futures.get(future)
|
||
pool_type = "CPU" if future in cpu_futures else "GPU"
|
||
|
||
try:
|
||
data = future.result(timeout=300) # 5分钟超时
|
||
result = TaskResult(task_id=task_id, success=True, data=data)
|
||
if on_result:
|
||
on_result(result)
|
||
except Exception as e:
|
||
logger.error(f"[{pool_type}] Task {task_id} failed: {e}")
|
||
result = TaskResult(task_id=task_id, success=False, data=None, error=str(e))
|
||
if on_error:
|
||
on_error(task_id, e)
|
||
|
||
results.append(result)
|
||
|
||
return results
|
||
```
|
||
|
||
---
|
||
|
||
### 阶段 3:集成到 autolabel (1-2天)
|
||
|
||
#### 3.1 修改 autolabel.py
|
||
|
||
```python
|
||
# src/cli/autolabel.py
|
||
|
||
def run_autolabel_dual_pool(args):
|
||
"""使用双池模式运行自动标注"""
|
||
|
||
from src.processing.dual_pool_coordinator import DualPoolCoordinator
|
||
|
||
# 初始化数据库批处理
|
||
db_batch = []
|
||
db_batch_size = 100
|
||
|
||
def on_result(result: TaskResult):
|
||
"""处理成功结果"""
|
||
nonlocal db_batch
|
||
db_batch.append(result.data)
|
||
|
||
if len(db_batch) >= db_batch_size:
|
||
save_documents_batch(db_batch)
|
||
db_batch.clear()
|
||
|
||
def on_error(task_id: str, error: Exception):
|
||
"""处理错误"""
|
||
logger.error(f"Task {task_id} failed: {error}")
|
||
|
||
# 创建双池协调器
|
||
with DualPoolCoordinator(
|
||
cpu_workers=args.cpu_workers or 4,
|
||
gpu_workers=args.gpu_workers or 1,
|
||
gpu_id=0
|
||
) as coordinator:
|
||
|
||
# 处理所有 CSV
|
||
for csv_file in csv_files:
|
||
documents = load_documents_from_csv(csv_file)
|
||
|
||
results = coordinator.process_batch(
|
||
documents=documents,
|
||
cpu_task_fn=process_text_pdf,
|
||
gpu_task_fn=process_scanned_pdf,
|
||
on_result=on_result,
|
||
on_error=on_error
|
||
)
|
||
|
||
logger.info(f"CSV {csv_file}: {len(results)} processed")
|
||
|
||
# 保存剩余批次
|
||
if db_batch:
|
||
save_documents_batch(db_batch)
|
||
```
|
||
|
||
---
|
||
|
||
### 阶段 4:测试与验证 (1-2天)
|
||
|
||
#### 4.1 单元测试
|
||
|
||
```python
|
||
# tests/unit/test_dual_pool.py
|
||
|
||
import pytest
|
||
from src.processing.dual_pool_coordinator import DualPoolCoordinator, TaskResult
|
||
|
||
class TestDualPoolCoordinator:
|
||
|
||
def test_cpu_only_batch(self):
|
||
"""测试纯 CPU 任务批处理"""
|
||
with DualPoolCoordinator(cpu_workers=2, gpu_workers=1) as coord:
|
||
docs = [{"id": f"doc_{i}", "type": "text"} for i in range(10)]
|
||
results = coord.process_batch(docs, cpu_fn, gpu_fn)
|
||
assert len(results) == 10
|
||
assert all(r.success for r in results)
|
||
|
||
def test_mixed_batch(self):
|
||
"""测试混合任务批处理"""
|
||
with DualPoolCoordinator(cpu_workers=2, gpu_workers=1) as coord:
|
||
docs = [
|
||
{"id": "text_1", "type": "text"},
|
||
{"id": "scan_1", "type": "scanned"},
|
||
{"id": "text_2", "type": "text"},
|
||
]
|
||
results = coord.process_batch(docs, cpu_fn, gpu_fn)
|
||
assert len(results) == 3
|
||
|
||
def test_timeout_handling(self):
|
||
"""测试超时处理"""
|
||
pass
|
||
|
||
def test_error_recovery(self):
|
||
"""测试错误恢复"""
|
||
pass
|
||
```
|
||
|
||
#### 4.2 集成测试
|
||
|
||
```python
|
||
# tests/integration/test_autolabel_dual_pool.py
|
||
|
||
def test_autolabel_with_dual_pool():
|
||
"""端到端测试双池模式"""
|
||
# 使用少量测试数据
|
||
result = subprocess.run([
|
||
"python", "-m", "src.cli.autolabel",
|
||
"--cpu-workers", "2",
|
||
"--gpu-workers", "1",
|
||
"--limit", "50"
|
||
], capture_output=True)
|
||
|
||
assert result.returncode == 0
|
||
# 验证数据库记录
|
||
```
|
||
|
||
---
|
||
|
||
## 4. 关键技术点
|
||
|
||
### 4.1 避免死锁的策略
|
||
|
||
```python
|
||
# 1. 使用 timeout
|
||
try:
|
||
result = future.result(timeout=300)
|
||
except TimeoutError:
|
||
logger.warning(f"Task timed out")
|
||
|
||
# 2. 使用哨兵值
|
||
SENTINEL = object()
|
||
queue.put(SENTINEL) # 发送结束信号
|
||
|
||
# 3. 检查进程状态
|
||
if not worker.is_alive():
|
||
logger.error("Worker died unexpectedly")
|
||
break
|
||
|
||
# 4. 先清空队列再 join
|
||
while not queue.empty():
|
||
results.append(queue.get_nowait())
|
||
worker.join(timeout=5.0)
|
||
```
|
||
|
||
### 4.2 PaddleOCR 特殊处理
|
||
|
||
```python
|
||
# PaddleOCR 必须在 worker 进程中初始化
|
||
def init_paddle_worker(gpu_id: int):
|
||
global _ocr
|
||
import os
|
||
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
|
||
|
||
# 延迟导入,确保 CUDA 环境变量生效
|
||
from paddleocr import PaddleOCR
|
||
_ocr = PaddleOCR(
|
||
use_angle_cls=True,
|
||
lang='en',
|
||
use_gpu=True,
|
||
show_log=False,
|
||
# 重要:设置 GPU 内存比例
|
||
gpu_mem=2000 # 限制 GPU 内存使用 (MB)
|
||
)
|
||
```
|
||
|
||
### 4.3 资源监控
|
||
|
||
```python
|
||
import psutil
|
||
import GPUtil
|
||
|
||
def get_resource_usage():
|
||
"""获取系统资源使用情况"""
|
||
cpu_percent = psutil.cpu_percent(interval=1)
|
||
memory = psutil.virtual_memory()
|
||
|
||
gpu_info = []
|
||
for gpu in GPUtil.getGPUs():
|
||
gpu_info.append({
|
||
"id": gpu.id,
|
||
"memory_used": gpu.memoryUsed,
|
||
"memory_total": gpu.memoryTotal,
|
||
"utilization": gpu.load * 100
|
||
})
|
||
|
||
return {
|
||
"cpu_percent": cpu_percent,
|
||
"memory_percent": memory.percent,
|
||
"gpu": gpu_info
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 风险评估与应对
|
||
|
||
| 风险 | 可能性 | 影响 | 应对策略 |
|
||
|------|--------|------|----------|
|
||
| GPU 内存不足 | 中 | 高 | 限制 GPU worker = 1,设置 gpu_mem 参数 |
|
||
| 进程僵死 | 低 | 高 | 添加心跳检测,超时自动重启 |
|
||
| 任务分类错误 | 中 | 中 | 添加回退机制,CPU 失败后尝试 GPU |
|
||
| 数据库写入瓶颈 | 低 | 中 | 增大批处理大小,异步写入 |
|
||
|
||
---
|
||
|
||
## 6. 备选方案
|
||
|
||
如果上述方案仍存在问题,可以考虑:
|
||
|
||
### 6.1 使用 Ray
|
||
|
||
```python
|
||
import ray
|
||
|
||
ray.init()
|
||
|
||
@ray.remote(num_cpus=1)
|
||
def cpu_task(data):
|
||
return process_text_pdf(data)
|
||
|
||
@ray.remote(num_gpus=1)
|
||
def gpu_task(data):
|
||
return process_scanned_pdf(data)
|
||
|
||
# 自动资源调度
|
||
futures = [cpu_task.remote(d) for d in cpu_docs]
|
||
futures += [gpu_task.remote(d) for d in gpu_docs]
|
||
results = ray.get(futures)
|
||
```
|
||
|
||
### 6.2 单池 + 动态 GPU 调度
|
||
|
||
保持单池模式,但在每个任务内部动态决定是否使用 GPU:
|
||
|
||
```python
|
||
def process_document(doc_data):
|
||
if is_scanned_pdf(doc_data):
|
||
# 使用 GPU (需要全局锁或信号量控制并发)
|
||
with gpu_semaphore:
|
||
return process_with_ocr(doc_data)
|
||
else:
|
||
return process_text_only(doc_data)
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 时间线总结
|
||
|
||
| 阶段 | 任务 | 预计工作量 |
|
||
|------|------|------------|
|
||
| 阶段 1 | 基础架构重构 | 2-3 天 |
|
||
| 阶段 2 | 双池协调器实现 | 2-3 天 |
|
||
| 阶段 3 | 集成到 autolabel | 1-2 天 |
|
||
| 阶段 4 | 测试与验证 | 1-2 天 |
|
||
| **总计** | | **6-10 天** |
|
||
|
||
---
|
||
|
||
## 8. 参考资料
|
||
|
||
1. [Python concurrent.futures 官方文档](https://docs.python.org/3/library/concurrent.futures.html)
|
||
2. [PyTorch Multiprocessing Best Practices](https://docs.pytorch.org/docs/stable/notes/multiprocessing.html)
|
||
3. [Super Fast Python - ProcessPoolExecutor 完整指南](https://superfastpython.com/processpoolexecutor-in-python/)
|
||
4. [PaddleOCR 并行推理文档](http://www.paddleocr.ai/main/en/version3.x/pipeline_usage/instructions/parallel_inference.html)
|
||
5. [AWS - 跨 CPU/GPU 并行化 ML 推理](https://aws.amazon.com/blogs/machine-learning/parallelizing-across-multiple-cpu-gpus-to-speed-up-deep-learning-inference-at-the-edge/)
|
||
6. [Ray 分布式多进程处理](https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html)
|