314 lines
8.4 KiB
Markdown
314 lines
8.4 KiB
Markdown
# Backend Development Patterns
|
|
|
|
Backend architecture patterns for Python/FastAPI/PostgreSQL applications.
|
|
|
|
## API Design
|
|
|
|
### RESTful Structure
|
|
|
|
```
|
|
GET /api/v1/documents # List
|
|
GET /api/v1/documents/{id} # Get
|
|
POST /api/v1/documents # Create
|
|
PUT /api/v1/documents/{id} # Replace
|
|
PATCH /api/v1/documents/{id} # Update
|
|
DELETE /api/v1/documents/{id} # Delete
|
|
|
|
GET /api/v1/documents?status=processed&sort=created_at&limit=20&offset=0
|
|
```
|
|
|
|
### FastAPI Route Pattern
|
|
|
|
```python
|
|
from fastapi import APIRouter, HTTPException, Depends, Query, File, UploadFile
|
|
from pydantic import BaseModel
|
|
|
|
router = APIRouter(prefix="/api/v1", tags=["inference"])
|
|
|
|
@router.post("/infer", response_model=ApiResponse[InferenceResult])
|
|
async def infer_document(
|
|
file: UploadFile = File(...),
|
|
confidence_threshold: float = Query(0.5, ge=0, le=1),
|
|
service: InferenceService = Depends(get_inference_service)
|
|
) -> ApiResponse[InferenceResult]:
|
|
result = await service.process(file, confidence_threshold)
|
|
return ApiResponse(success=True, data=result)
|
|
```
|
|
|
|
### Consistent Response Schema
|
|
|
|
```python
|
|
from typing import Generic, TypeVar
|
|
T = TypeVar('T')
|
|
|
|
class ApiResponse(BaseModel, Generic[T]):
|
|
success: bool
|
|
data: T | None = None
|
|
error: str | None = None
|
|
meta: dict | None = None
|
|
```
|
|
|
|
## Core Patterns
|
|
|
|
### Repository Pattern
|
|
|
|
```python
|
|
from typing import Protocol
|
|
|
|
class DocumentRepository(Protocol):
|
|
def find_all(self, filters: dict | None = None) -> list[Document]: ...
|
|
def find_by_id(self, id: str) -> Document | None: ...
|
|
def create(self, data: dict) -> Document: ...
|
|
def update(self, id: str, data: dict) -> Document: ...
|
|
def delete(self, id: str) -> None: ...
|
|
```
|
|
|
|
### Service Layer
|
|
|
|
```python
|
|
class InferenceService:
|
|
def __init__(self, model_path: str, use_gpu: bool = True):
|
|
self.pipeline = InferencePipeline(model_path=model_path, use_gpu=use_gpu)
|
|
|
|
async def process(self, file: UploadFile, confidence_threshold: float) -> InferenceResult:
|
|
temp_path = self._save_temp_file(file)
|
|
try:
|
|
return self.pipeline.process_pdf(temp_path)
|
|
finally:
|
|
temp_path.unlink(missing_ok=True)
|
|
```
|
|
|
|
### Dependency Injection
|
|
|
|
```python
|
|
from functools import lru_cache
|
|
from pydantic_settings import BaseSettings
|
|
|
|
class Settings(BaseSettings):
|
|
db_host: str = "localhost"
|
|
db_password: str
|
|
model_path: str = "runs/train/invoice_fields/weights/best.pt"
|
|
class Config:
|
|
env_file = ".env"
|
|
|
|
@lru_cache()
|
|
def get_settings() -> Settings:
|
|
return Settings()
|
|
|
|
def get_inference_service(settings: Settings = Depends(get_settings)) -> InferenceService:
|
|
return InferenceService(model_path=settings.model_path)
|
|
```
|
|
|
|
## Database Patterns
|
|
|
|
### Connection Pooling
|
|
|
|
```python
|
|
from psycopg2 import pool
|
|
from contextlib import contextmanager
|
|
|
|
db_pool = pool.ThreadedConnectionPool(minconn=2, maxconn=10, **db_config)
|
|
|
|
@contextmanager
|
|
def get_db_connection():
|
|
conn = db_pool.getconn()
|
|
try:
|
|
yield conn
|
|
finally:
|
|
db_pool.putconn(conn)
|
|
```
|
|
|
|
### Query Optimization
|
|
|
|
```python
|
|
# GOOD: Select only needed columns
|
|
cur.execute("""
|
|
SELECT id, status, fields->>'InvoiceNumber' as invoice_number
|
|
FROM documents WHERE status = %s
|
|
ORDER BY created_at DESC LIMIT %s
|
|
""", ('processed', 10))
|
|
|
|
# BAD: SELECT * FROM documents
|
|
```
|
|
|
|
### N+1 Prevention
|
|
|
|
```python
|
|
# BAD: N+1 queries
|
|
for doc in documents:
|
|
doc.labels = get_labels(doc.id) # N queries
|
|
|
|
# GOOD: Batch fetch with JOIN
|
|
cur.execute("""
|
|
SELECT d.id, d.status, array_agg(l.label) as labels
|
|
FROM documents d
|
|
LEFT JOIN document_labels l ON d.id = l.document_id
|
|
GROUP BY d.id, d.status
|
|
""")
|
|
```
|
|
|
|
### Transaction Pattern
|
|
|
|
```python
|
|
def create_document_with_labels(doc_data: dict, labels: list[dict]) -> str:
|
|
with get_db_connection() as conn:
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("INSERT INTO documents ... RETURNING id", ...)
|
|
doc_id = cur.fetchone()[0]
|
|
for label in labels:
|
|
cur.execute("INSERT INTO document_labels ...", ...)
|
|
conn.commit()
|
|
return doc_id
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
```
|
|
|
|
## Caching
|
|
|
|
```python
|
|
from cachetools import TTLCache
|
|
|
|
_cache = TTLCache(maxsize=1000, ttl=300)
|
|
|
|
def get_document_cached(doc_id: str) -> Document | None:
|
|
if doc_id in _cache:
|
|
return _cache[doc_id]
|
|
doc = repo.find_by_id(doc_id)
|
|
if doc:
|
|
_cache[doc_id] = doc
|
|
return doc
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Exception Hierarchy
|
|
|
|
```python
|
|
class AppError(Exception):
|
|
def __init__(self, message: str, status_code: int = 500):
|
|
self.message = message
|
|
self.status_code = status_code
|
|
|
|
class NotFoundError(AppError):
|
|
def __init__(self, resource: str, id: str):
|
|
super().__init__(f"{resource} not found: {id}", 404)
|
|
|
|
class ValidationError(AppError):
|
|
def __init__(self, message: str):
|
|
super().__init__(message, 400)
|
|
```
|
|
|
|
### FastAPI Exception Handler
|
|
|
|
```python
|
|
@app.exception_handler(AppError)
|
|
async def app_error_handler(request: Request, exc: AppError):
|
|
return JSONResponse(status_code=exc.status_code, content={"success": False, "error": exc.message})
|
|
|
|
@app.exception_handler(Exception)
|
|
async def generic_error_handler(request: Request, exc: Exception):
|
|
logger.error(f"Unexpected error: {exc}", exc_info=True)
|
|
return JSONResponse(status_code=500, content={"success": False, "error": "Internal server error"})
|
|
```
|
|
|
|
### Retry with Backoff
|
|
|
|
```python
|
|
async def retry_with_backoff(fn, max_retries: int = 3, base_delay: float = 1.0):
|
|
last_error = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await fn() if asyncio.iscoroutinefunction(fn) else fn()
|
|
except Exception as e:
|
|
last_error = e
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(base_delay * (2 ** attempt))
|
|
raise last_error
|
|
```
|
|
|
|
## Rate Limiting
|
|
|
|
```python
|
|
from time import time
|
|
from collections import defaultdict
|
|
|
|
class RateLimiter:
|
|
def __init__(self):
|
|
self.requests: dict[str, list[float]] = defaultdict(list)
|
|
|
|
def check_limit(self, identifier: str, max_requests: int, window_sec: int) -> bool:
|
|
now = time()
|
|
self.requests[identifier] = [t for t in self.requests[identifier] if now - t < window_sec]
|
|
if len(self.requests[identifier]) >= max_requests:
|
|
return False
|
|
self.requests[identifier].append(now)
|
|
return True
|
|
|
|
limiter = RateLimiter()
|
|
|
|
@app.middleware("http")
|
|
async def rate_limit_middleware(request: Request, call_next):
|
|
ip = request.client.host
|
|
if not limiter.check_limit(ip, max_requests=100, window_sec=60):
|
|
return JSONResponse(status_code=429, content={"error": "Rate limit exceeded"})
|
|
return await call_next(request)
|
|
```
|
|
|
|
## Logging & Middleware
|
|
|
|
### Request Logging
|
|
|
|
```python
|
|
@app.middleware("http")
|
|
async def log_requests(request: Request, call_next):
|
|
request_id = str(uuid.uuid4())[:8]
|
|
start_time = time.time()
|
|
logger.info(f"[{request_id}] {request.method} {request.url.path}")
|
|
response = await call_next(request)
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
logger.info(f"[{request_id}] Completed {response.status_code} in {duration_ms:.2f}ms")
|
|
return response
|
|
```
|
|
|
|
### Structured Logging
|
|
|
|
```python
|
|
class JSONFormatter(logging.Formatter):
|
|
def format(self, record):
|
|
return json.dumps({
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"level": record.levelname,
|
|
"message": record.getMessage(),
|
|
"module": record.module,
|
|
})
|
|
```
|
|
|
|
## Background Tasks
|
|
|
|
```python
|
|
from fastapi import BackgroundTasks
|
|
|
|
def send_notification(document_id: str, status: str):
|
|
logger.info(f"Notification: {document_id} -> {status}")
|
|
|
|
@router.post("/infer")
|
|
async def infer(file: UploadFile, background_tasks: BackgroundTasks):
|
|
result = await process_document(file)
|
|
background_tasks.add_task(send_notification, result.document_id, "completed")
|
|
return result
|
|
```
|
|
|
|
## Key Principles
|
|
|
|
- Repository pattern: Abstract data access
|
|
- Service layer: Business logic separated from routes
|
|
- Dependency injection via `Depends()`
|
|
- Connection pooling for database
|
|
- Parameterized queries only (no f-strings in SQL)
|
|
- Batch fetch to prevent N+1
|
|
- Consistent `ApiResponse[T]` format
|
|
- Exception hierarchy with proper status codes
|
|
- Rate limit by IP
|
|
- Structured logging with request ID |