Files
2026-01-25 16:17:23 +01:00

8.4 KiB

Backend Development Patterns

Backend architecture patterns for Python/FastAPI/PostgreSQL applications.

API Design

RESTful Structure

GET    /api/v1/documents              # List
GET    /api/v1/documents/{id}         # Get
POST   /api/v1/documents              # Create
PUT    /api/v1/documents/{id}         # Replace
PATCH  /api/v1/documents/{id}         # Update
DELETE /api/v1/documents/{id}         # Delete

GET /api/v1/documents?status=processed&sort=created_at&limit=20&offset=0

FastAPI Route Pattern

from fastapi import APIRouter, HTTPException, Depends, Query, File, UploadFile
from pydantic import BaseModel

router = APIRouter(prefix="/api/v1", tags=["inference"])

@router.post("/infer", response_model=ApiResponse[InferenceResult])
async def infer_document(
    file: UploadFile = File(...),
    confidence_threshold: float = Query(0.5, ge=0, le=1),
    service: InferenceService = Depends(get_inference_service)
) -> ApiResponse[InferenceResult]:
    result = await service.process(file, confidence_threshold)
    return ApiResponse(success=True, data=result)

Consistent Response Schema

from typing import Generic, TypeVar
T = TypeVar('T')

class ApiResponse(BaseModel, Generic[T]):
    success: bool
    data: T | None = None
    error: str | None = None
    meta: dict | None = None

Core Patterns

Repository Pattern

from typing import Protocol

class DocumentRepository(Protocol):
    def find_all(self, filters: dict | None = None) -> list[Document]: ...
    def find_by_id(self, id: str) -> Document | None: ...
    def create(self, data: dict) -> Document: ...
    def update(self, id: str, data: dict) -> Document: ...
    def delete(self, id: str) -> None: ...

Service Layer

class InferenceService:
    def __init__(self, model_path: str, use_gpu: bool = True):
        self.pipeline = InferencePipeline(model_path=model_path, use_gpu=use_gpu)

    async def process(self, file: UploadFile, confidence_threshold: float) -> InferenceResult:
        temp_path = self._save_temp_file(file)
        try:
            return self.pipeline.process_pdf(temp_path)
        finally:
            temp_path.unlink(missing_ok=True)

Dependency Injection

from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    db_host: str = "localhost"
    db_password: str
    model_path: str = "runs/train/invoice_fields/weights/best.pt"
    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

def get_inference_service(settings: Settings = Depends(get_settings)) -> InferenceService:
    return InferenceService(model_path=settings.model_path)

Database Patterns

Connection Pooling

from psycopg2 import pool
from contextlib import contextmanager

db_pool = pool.ThreadedConnectionPool(minconn=2, maxconn=10, **db_config)

@contextmanager
def get_db_connection():
    conn = db_pool.getconn()
    try:
        yield conn
    finally:
        db_pool.putconn(conn)

Query Optimization

# GOOD: Select only needed columns
cur.execute("""
    SELECT id, status, fields->>'InvoiceNumber' as invoice_number
    FROM documents WHERE status = %s
    ORDER BY created_at DESC LIMIT %s
""", ('processed', 10))

# BAD: SELECT * FROM documents

N+1 Prevention

# BAD: N+1 queries
for doc in documents:
    doc.labels = get_labels(doc.id)  # N queries

# GOOD: Batch fetch with JOIN
cur.execute("""
    SELECT d.id, d.status, array_agg(l.label) as labels
    FROM documents d
    LEFT JOIN document_labels l ON d.id = l.document_id
    GROUP BY d.id, d.status
""")

Transaction Pattern

def create_document_with_labels(doc_data: dict, labels: list[dict]) -> str:
    with get_db_connection() as conn:
        try:
            with conn.cursor() as cur:
                cur.execute("INSERT INTO documents ... RETURNING id", ...)
                doc_id = cur.fetchone()[0]
                for label in labels:
                    cur.execute("INSERT INTO document_labels ...", ...)
                conn.commit()
                return doc_id
        except Exception:
            conn.rollback()
            raise

Caching

from cachetools import TTLCache

_cache = TTLCache(maxsize=1000, ttl=300)

def get_document_cached(doc_id: str) -> Document | None:
    if doc_id in _cache:
        return _cache[doc_id]
    doc = repo.find_by_id(doc_id)
    if doc:
        _cache[doc_id] = doc
    return doc

Error Handling

Exception Hierarchy

class AppError(Exception):
    def __init__(self, message: str, status_code: int = 500):
        self.message = message
        self.status_code = status_code

class NotFoundError(AppError):
    def __init__(self, resource: str, id: str):
        super().__init__(f"{resource} not found: {id}", 404)

class ValidationError(AppError):
    def __init__(self, message: str):
        super().__init__(message, 400)

FastAPI Exception Handler

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
    return JSONResponse(status_code=exc.status_code, content={"success": False, "error": exc.message})

@app.exception_handler(Exception)
async def generic_error_handler(request: Request, exc: Exception):
    logger.error(f"Unexpected error: {exc}", exc_info=True)
    return JSONResponse(status_code=500, content={"success": False, "error": "Internal server error"})

Retry with Backoff

async def retry_with_backoff(fn, max_retries: int = 3, base_delay: float = 1.0):
    last_error = None
    for attempt in range(max_retries):
        try:
            return await fn() if asyncio.iscoroutinefunction(fn) else fn()
        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                await asyncio.sleep(base_delay * (2 ** attempt))
    raise last_error

Rate Limiting

from time import time
from collections import defaultdict

class RateLimiter:
    def __init__(self):
        self.requests: dict[str, list[float]] = defaultdict(list)

    def check_limit(self, identifier: str, max_requests: int, window_sec: int) -> bool:
        now = time()
        self.requests[identifier] = [t for t in self.requests[identifier] if now - t < window_sec]
        if len(self.requests[identifier]) >= max_requests:
            return False
        self.requests[identifier].append(now)
        return True

limiter = RateLimiter()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    ip = request.client.host
    if not limiter.check_limit(ip, max_requests=100, window_sec=60):
        return JSONResponse(status_code=429, content={"error": "Rate limit exceeded"})
    return await call_next(request)

Logging & Middleware

Request Logging

@app.middleware("http")
async def log_requests(request: Request, call_next):
    request_id = str(uuid.uuid4())[:8]
    start_time = time.time()
    logger.info(f"[{request_id}] {request.method} {request.url.path}")
    response = await call_next(request)
    duration_ms = (time.time() - start_time) * 1000
    logger.info(f"[{request_id}] Completed {response.status_code} in {duration_ms:.2f}ms")
    return response

Structured Logging

class JSONFormatter(logging.Formatter):
    def format(self, record):
        return json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
        })

Background Tasks

from fastapi import BackgroundTasks

def send_notification(document_id: str, status: str):
    logger.info(f"Notification: {document_id} -> {status}")

@router.post("/infer")
async def infer(file: UploadFile, background_tasks: BackgroundTasks):
    result = await process_document(file)
    background_tasks.add_task(send_notification, result.document_id, "completed")
    return result

Key Principles

  • Repository pattern: Abstract data access
  • Service layer: Business logic separated from routes
  • Dependency injection via Depends()
  • Connection pooling for database
  • Parameterized queries only (no f-strings in SQL)
  • Batch fetch to prevent N+1
  • Consistent ApiResponse[T] format
  • Exception hierarchy with proper status codes
  • Rate limit by IP
  • Structured logging with request ID