Files
invoice-master-poc-v2/.claude/skills/security-review/SKILL.md
2026-01-25 16:17:23 +01:00

16 KiB

name, description
name description
security-review Use this skill when adding authentication, handling user input, working with secrets, creating API endpoints, or implementing payment/sensitive features. Provides comprehensive security checklist and patterns.

Security Review Skill

Security best practices for Python/FastAPI applications handling sensitive invoice data.

When to Activate

  • Implementing authentication or authorization
  • Handling user input or file uploads
  • Creating new API endpoints
  • Working with secrets or credentials
  • Processing sensitive invoice data
  • Integrating third-party APIs
  • Database operations with user data

Security Checklist

1. Secrets Management

NEVER Do This

# Hardcoded secrets - CRITICAL VULNERABILITY
api_key = "sk-proj-xxxxx"
db_password = "password123"

ALWAYS Do This

import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    db_password: str
    api_key: str
    model_path: str = "runs/train/invoice_fields/weights/best.pt"

    class Config:
        env_file = ".env"

settings = Settings()

# Verify secrets exist
if not settings.db_password:
    raise RuntimeError("DB_PASSWORD not configured")

Verification Steps

  • No hardcoded API keys, tokens, or passwords
  • All secrets in environment variables
  • .env in .gitignore
  • No secrets in git history
  • .env.example with placeholder values

2. Input Validation

Always Validate User Input

from pydantic import BaseModel, Field, field_validator
from fastapi import HTTPException
import re

class InvoiceRequest(BaseModel):
    invoice_number: str = Field(..., min_length=1, max_length=50)
    amount: float = Field(..., gt=0, le=1_000_000)
    bankgiro: str | None = None

    @field_validator("invoice_number")
    @classmethod
    def validate_invoice_number(cls, v: str) -> str:
        # Whitelist validation - only allow safe characters
        if not re.match(r"^[A-Za-z0-9\-_]+$", v):
            raise ValueError("Invalid invoice number format")
        return v

    @field_validator("bankgiro")
    @classmethod
    def validate_bankgiro(cls, v: str | None) -> str | None:
        if v is None:
            return None
        cleaned = re.sub(r"[^0-9]", "", v)
        if not (7 <= len(cleaned) <= 8):
            raise ValueError("Bankgiro must be 7-8 digits")
        return cleaned

File Upload Validation

from fastapi import UploadFile, HTTPException
from pathlib import Path

ALLOWED_EXTENSIONS = {".pdf"}
MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB

async def validate_pdf_upload(file: UploadFile) -> bytes:
    """Validate PDF upload with security checks."""
    # Extension check
    ext = Path(file.filename or "").suffix.lower()
    if ext not in ALLOWED_EXTENSIONS:
        raise HTTPException(400, f"Only PDF files allowed, got {ext}")

    # Read content
    content = await file.read()

    # Size check
    if len(content) > MAX_FILE_SIZE:
        raise HTTPException(400, f"File too large (max {MAX_FILE_SIZE // 1024 // 1024}MB)")

    # Magic bytes check (PDF signature)
    if not content.startswith(b"%PDF"):
        raise HTTPException(400, "Invalid PDF file format")

    return content

Verification Steps

  • All user inputs validated with Pydantic
  • File uploads restricted (size, type, extension, magic bytes)
  • No direct use of user input in queries
  • Whitelist validation (not blacklist)
  • Error messages don't leak sensitive info

3. SQL Injection Prevention

NEVER Concatenate SQL

# DANGEROUS - SQL Injection vulnerability
query = f"SELECT * FROM documents WHERE id = '{user_input}'"
cur.execute(query)

ALWAYS Use Parameterized Queries

import psycopg2

# Safe - parameterized query with %s placeholders
cur.execute(
    "SELECT * FROM documents WHERE id = %s AND status = %s",
    (document_id, status)
)

# Safe - named parameters
cur.execute(
    "SELECT * FROM documents WHERE id = %(id)s",
    {"id": document_id}
)

# Safe - psycopg2.sql for dynamic identifiers
from psycopg2 import sql

cur.execute(
    sql.SQL("SELECT {} FROM {} WHERE id = %s").format(
        sql.Identifier("invoice_number"),
        sql.Identifier("documents")
    ),
    (document_id,)
)

Verification Steps

  • All database queries use parameterized queries (%s or %(name)s)
  • No string concatenation or f-strings in SQL
  • psycopg2.sql module used for dynamic identifiers
  • No user input in table/column names

4. Path Traversal Prevention

NEVER Trust User Paths

# DANGEROUS - Path traversal vulnerability
filename = request.query_params.get("file")
with open(f"/data/{filename}", "r") as f:  # Attacker: ../../../etc/passwd
    return f.read()

ALWAYS Validate Paths

from pathlib import Path

ALLOWED_DIR = Path("/data/uploads").resolve()

def get_safe_path(filename: str) -> Path:
    """Get safe file path, preventing path traversal."""
    # Remove any path components
    safe_name = Path(filename).name

    # Validate filename characters
    if not re.match(r"^[A-Za-z0-9_\-\.]+$", safe_name):
        raise HTTPException(400, "Invalid filename")

    # Resolve and verify within allowed directory
    full_path = (ALLOWED_DIR / safe_name).resolve()

    if not full_path.is_relative_to(ALLOWED_DIR):
        raise HTTPException(400, "Invalid file path")

    return full_path

Verification Steps

  • User-provided filenames sanitized
  • Paths resolved and validated against allowed directory
  • No direct concatenation of user input into paths
  • Whitelist characters in filenames

5. Authentication & Authorization

API Key Validation

from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
    if not api_key:
        raise HTTPException(401, "API key required")

    # Constant-time comparison to prevent timing attacks
    import hmac
    if not hmac.compare_digest(api_key, settings.api_key):
        raise HTTPException(403, "Invalid API key")

    return api_key

@router.post("/infer")
async def infer(
    file: UploadFile,
    api_key: str = Depends(verify_api_key)
):
    ...

Role-Based Access Control

from enum import Enum

class UserRole(str, Enum):
    USER = "user"
    ADMIN = "admin"

def require_role(required_role: UserRole):
    async def role_checker(current_user: User = Depends(get_current_user)):
        if current_user.role != required_role:
            raise HTTPException(403, "Insufficient permissions")
        return current_user
    return role_checker

@router.delete("/documents/{doc_id}")
async def delete_document(
    doc_id: str,
    user: User = Depends(require_role(UserRole.ADMIN))
):
    ...

Verification Steps

  • API keys validated with constant-time comparison
  • Authorization checks before sensitive operations
  • Role-based access control implemented
  • Session/token validation on protected routes

6. Rate Limiting

Rate Limiter Implementation

from time import time
from collections import defaultdict
from fastapi import Request, HTTPException

class RateLimiter:
    def __init__(self):
        self.requests: dict[str, list[float]] = defaultdict(list)

    def check_limit(
        self,
        identifier: str,
        max_requests: int,
        window_seconds: int
    ) -> bool:
        now = time()
        # Clean old requests
        self.requests[identifier] = [
            t for t in self.requests[identifier]
            if now - t < window_seconds
        ]
        # Check limit
        if len(self.requests[identifier]) >= max_requests:
            return False
        self.requests[identifier].append(now)
        return True

limiter = RateLimiter()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_ip = request.client.host if request.client else "unknown"

    # 100 requests per minute for general endpoints
    if not limiter.check_limit(client_ip, max_requests=100, window_seconds=60):
        raise HTTPException(429, "Rate limit exceeded. Try again later.")

    return await call_next(request)

Stricter Limits for Expensive Operations

# Inference endpoint: 10 requests per minute
async def check_inference_rate_limit(request: Request):
    client_ip = request.client.host if request.client else "unknown"
    if not limiter.check_limit(f"infer:{client_ip}", max_requests=10, window_seconds=60):
        raise HTTPException(429, "Inference rate limit exceeded")

@router.post("/infer")
async def infer(
    file: UploadFile,
    _: None = Depends(check_inference_rate_limit)
):
    ...

Verification Steps

  • Rate limiting on all API endpoints
  • Stricter limits on expensive operations (inference, OCR)
  • IP-based rate limiting
  • Clear error messages for rate-limited requests

7. Sensitive Data Exposure

Logging

import logging

logger = logging.getLogger(__name__)

# WRONG: Logging sensitive data
logger.info(f"Processing invoice: {invoice_data}")  # May contain sensitive info
logger.error(f"DB error with password: {db_password}")

# CORRECT: Redact sensitive data
logger.info(f"Processing invoice: id={doc_id}")
logger.error(f"DB connection failed to {db_host}:{db_port}")

# CORRECT: Structured logging with safe fields only
logger.info(
    "Invoice processed",
    extra={
        "document_id": doc_id,
        "field_count": len(fields),
        "processing_time_ms": elapsed_ms
    }
)

Error Messages

# WRONG: Exposing internal details
@app.exception_handler(Exception)
async def error_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={
            "error": str(exc),
            "traceback": traceback.format_exc()  # NEVER expose!
        }
    )

# CORRECT: Generic error messages
@app.exception_handler(Exception)
async def error_handler(request: Request, exc: Exception):
    logger.error(f"Unhandled error: {exc}", exc_info=True)  # Log internally
    return JSONResponse(
        status_code=500,
        content={"success": False, "error": "An error occurred"}
    )

Verification Steps

  • No passwords, tokens, or secrets in logs
  • Error messages generic for users
  • Detailed errors only in server logs
  • No stack traces exposed to users
  • Invoice data (amounts, account numbers) not logged

8. CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

# WRONG: Allow all origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # DANGEROUS in production
    allow_credentials=True,
)

# CORRECT: Specific origins
ALLOWED_ORIGINS = [
    "http://localhost:8000",
    "https://your-domain.com",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

Verification Steps

  • CORS origins explicitly listed
  • No wildcard origins in production
  • Credentials only with specific origins

9. Temporary File Security

import tempfile
from pathlib import Path
from contextlib import contextmanager

@contextmanager
def secure_temp_file(suffix: str = ".pdf"):
    """Create secure temporary file that is always cleaned up."""
    tmp_path = None
    try:
        with tempfile.NamedTemporaryFile(
            suffix=suffix,
            delete=False,
            dir="/tmp/invoice-master"  # Dedicated temp directory
        ) as tmp:
            tmp_path = Path(tmp.name)
            yield tmp_path
    finally:
        if tmp_path and tmp_path.exists():
            tmp_path.unlink()

# Usage
async def process_upload(file: UploadFile):
    with secure_temp_file(".pdf") as tmp_path:
        content = await validate_pdf_upload(file)
        tmp_path.write_bytes(content)
        result = pipeline.process(tmp_path)
    # File automatically cleaned up
    return result

Verification Steps

  • Temporary files always cleaned up (use context managers)
  • Temp directory has restricted permissions
  • No leftover files after processing errors

10. Dependency Security

Regular Updates

# Check for vulnerabilities
pip-audit

# Update dependencies
pip install --upgrade -r requirements.txt

# Check for outdated packages
pip list --outdated

Lock Files

# Create requirements lock file
pip freeze > requirements.lock

# Install from lock file for reproducible builds
pip install -r requirements.lock

Verification Steps

  • Dependencies up to date
  • No known vulnerabilities (pip-audit clean)
  • requirements.txt pinned versions
  • Regular security updates scheduled

Security Testing

Automated Security Tests

import pytest
from fastapi.testclient import TestClient

def test_requires_api_key(client: TestClient):
    """Test authentication required."""
    response = client.post("/api/v1/infer")
    assert response.status_code == 401

def test_invalid_api_key_rejected(client: TestClient):
    """Test invalid API key rejected."""
    response = client.post(
        "/api/v1/infer",
        headers={"X-API-Key": "invalid-key"}
    )
    assert response.status_code == 403

def test_sql_injection_prevented(client: TestClient):
    """Test SQL injection attempt rejected."""
    response = client.get(
        "/api/v1/documents",
        params={"id": "'; DROP TABLE documents; --"}
    )
    # Should return validation error, not execute SQL
    assert response.status_code in (400, 422)

def test_path_traversal_prevented(client: TestClient):
    """Test path traversal attempt rejected."""
    response = client.get("/api/v1/results/../../etc/passwd")
    assert response.status_code == 400

def test_rate_limit_enforced(client: TestClient):
    """Test rate limiting works."""
    responses = [
        client.post("/api/v1/infer", files={"file": b"test"})
        for _ in range(15)
    ]
    rate_limited = [r for r in responses if r.status_code == 429]
    assert len(rate_limited) > 0

def test_large_file_rejected(client: TestClient):
    """Test file size limit enforced."""
    large_content = b"x" * (11 * 1024 * 1024)  # 11MB
    response = client.post(
        "/api/v1/infer",
        files={"file": ("test.pdf", large_content)}
    )
    assert response.status_code == 400

Pre-Deployment Security Checklist

Before ANY production deployment:

  • Secrets: No hardcoded secrets, all in env vars
  • Input Validation: All user inputs validated with Pydantic
  • SQL Injection: All queries use parameterized queries
  • Path Traversal: File paths validated and sanitized
  • Authentication: API key or token validation
  • Authorization: Role checks in place
  • Rate Limiting: Enabled on all endpoints
  • HTTPS: Enforced in production
  • CORS: Properly configured (no wildcards)
  • Error Handling: No sensitive data in errors
  • Logging: No sensitive data logged
  • File Uploads: Validated (size, type, magic bytes)
  • Temp Files: Always cleaned up
  • Dependencies: Up to date, no vulnerabilities

Resources


Remember: Security is not optional. One vulnerability can compromise sensitive invoice data. When in doubt, err on the side of caution.