--- name: security-review description: Use this skill when adding authentication, handling user input, working with secrets, creating API endpoints, or implementing payment/sensitive features. Provides comprehensive security checklist and patterns. --- # Security Review Skill Security best practices for Python/FastAPI applications handling sensitive invoice data. ## When to Activate - Implementing authentication or authorization - Handling user input or file uploads - Creating new API endpoints - Working with secrets or credentials - Processing sensitive invoice data - Integrating third-party APIs - Database operations with user data ## Security Checklist ### 1. Secrets Management #### NEVER Do This ```python # Hardcoded secrets - CRITICAL VULNERABILITY api_key = "sk-proj-xxxxx" db_password = "password123" ``` #### ALWAYS Do This ```python import os from pydantic_settings import BaseSettings class Settings(BaseSettings): db_password: str api_key: str model_path: str = "runs/train/invoice_fields/weights/best.pt" class Config: env_file = ".env" settings = Settings() # Verify secrets exist if not settings.db_password: raise RuntimeError("DB_PASSWORD not configured") ``` #### Verification Steps - [ ] No hardcoded API keys, tokens, or passwords - [ ] All secrets in environment variables - [ ] `.env` in .gitignore - [ ] No secrets in git history - [ ] `.env.example` with placeholder values ### 2. Input Validation #### Always Validate User Input ```python from pydantic import BaseModel, Field, field_validator from fastapi import HTTPException import re class InvoiceRequest(BaseModel): invoice_number: str = Field(..., min_length=1, max_length=50) amount: float = Field(..., gt=0, le=1_000_000) bankgiro: str | None = None @field_validator("invoice_number") @classmethod def validate_invoice_number(cls, v: str) -> str: # Whitelist validation - only allow safe characters if not re.match(r"^[A-Za-z0-9\-_]+$", v): raise ValueError("Invalid invoice number format") return v @field_validator("bankgiro") @classmethod def validate_bankgiro(cls, v: str | None) -> str | None: if v is None: return None cleaned = re.sub(r"[^0-9]", "", v) if not (7 <= len(cleaned) <= 8): raise ValueError("Bankgiro must be 7-8 digits") return cleaned ``` #### File Upload Validation ```python from fastapi import UploadFile, HTTPException from pathlib import Path ALLOWED_EXTENSIONS = {".pdf"} MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB async def validate_pdf_upload(file: UploadFile) -> bytes: """Validate PDF upload with security checks.""" # Extension check ext = Path(file.filename or "").suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400, f"Only PDF files allowed, got {ext}") # Read content content = await file.read() # Size check if len(content) > MAX_FILE_SIZE: raise HTTPException(400, f"File too large (max {MAX_FILE_SIZE // 1024 // 1024}MB)") # Magic bytes check (PDF signature) if not content.startswith(b"%PDF"): raise HTTPException(400, "Invalid PDF file format") return content ``` #### Verification Steps - [ ] All user inputs validated with Pydantic - [ ] File uploads restricted (size, type, extension, magic bytes) - [ ] No direct use of user input in queries - [ ] Whitelist validation (not blacklist) - [ ] Error messages don't leak sensitive info ### 3. SQL Injection Prevention #### NEVER Concatenate SQL ```python # DANGEROUS - SQL Injection vulnerability query = f"SELECT * FROM documents WHERE id = '{user_input}'" cur.execute(query) ``` #### ALWAYS Use Parameterized Queries ```python import psycopg2 # Safe - parameterized query with %s placeholders cur.execute( "SELECT * FROM documents WHERE id = %s AND status = %s", (document_id, status) ) # Safe - named parameters cur.execute( "SELECT * FROM documents WHERE id = %(id)s", {"id": document_id} ) # Safe - psycopg2.sql for dynamic identifiers from psycopg2 import sql cur.execute( sql.SQL("SELECT {} FROM {} WHERE id = %s").format( sql.Identifier("invoice_number"), sql.Identifier("documents") ), (document_id,) ) ``` #### Verification Steps - [ ] All database queries use parameterized queries (%s or %(name)s) - [ ] No string concatenation or f-strings in SQL - [ ] psycopg2.sql module used for dynamic identifiers - [ ] No user input in table/column names ### 4. Path Traversal Prevention #### NEVER Trust User Paths ```python # DANGEROUS - Path traversal vulnerability filename = request.query_params.get("file") with open(f"/data/{filename}", "r") as f: # Attacker: ../../../etc/passwd return f.read() ``` #### ALWAYS Validate Paths ```python from pathlib import Path ALLOWED_DIR = Path("/data/uploads").resolve() def get_safe_path(filename: str) -> Path: """Get safe file path, preventing path traversal.""" # Remove any path components safe_name = Path(filename).name # Validate filename characters if not re.match(r"^[A-Za-z0-9_\-\.]+$", safe_name): raise HTTPException(400, "Invalid filename") # Resolve and verify within allowed directory full_path = (ALLOWED_DIR / safe_name).resolve() if not full_path.is_relative_to(ALLOWED_DIR): raise HTTPException(400, "Invalid file path") return full_path ``` #### Verification Steps - [ ] User-provided filenames sanitized - [ ] Paths resolved and validated against allowed directory - [ ] No direct concatenation of user input into paths - [ ] Whitelist characters in filenames ### 5. Authentication & Authorization #### API Key Validation ```python from fastapi import Depends, HTTPException, Security from fastapi.security import APIKeyHeader api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) async def verify_api_key(api_key: str = Security(api_key_header)) -> str: if not api_key: raise HTTPException(401, "API key required") # Constant-time comparison to prevent timing attacks import hmac if not hmac.compare_digest(api_key, settings.api_key): raise HTTPException(403, "Invalid API key") return api_key @router.post("/infer") async def infer( file: UploadFile, api_key: str = Depends(verify_api_key) ): ... ``` #### Role-Based Access Control ```python from enum import Enum class UserRole(str, Enum): USER = "user" ADMIN = "admin" def require_role(required_role: UserRole): async def role_checker(current_user: User = Depends(get_current_user)): if current_user.role != required_role: raise HTTPException(403, "Insufficient permissions") return current_user return role_checker @router.delete("/documents/{doc_id}") async def delete_document( doc_id: str, user: User = Depends(require_role(UserRole.ADMIN)) ): ... ``` #### Verification Steps - [ ] API keys validated with constant-time comparison - [ ] Authorization checks before sensitive operations - [ ] Role-based access control implemented - [ ] Session/token validation on protected routes ### 6. Rate Limiting #### Rate Limiter Implementation ```python from time import time from collections import defaultdict from fastapi import Request, HTTPException class RateLimiter: def __init__(self): self.requests: dict[str, list[float]] = defaultdict(list) def check_limit( self, identifier: str, max_requests: int, window_seconds: int ) -> bool: now = time() # Clean old requests self.requests[identifier] = [ t for t in self.requests[identifier] if now - t < window_seconds ] # Check limit if len(self.requests[identifier]) >= max_requests: return False self.requests[identifier].append(now) return True limiter = RateLimiter() @app.middleware("http") async def rate_limit_middleware(request: Request, call_next): client_ip = request.client.host if request.client else "unknown" # 100 requests per minute for general endpoints if not limiter.check_limit(client_ip, max_requests=100, window_seconds=60): raise HTTPException(429, "Rate limit exceeded. Try again later.") return await call_next(request) ``` #### Stricter Limits for Expensive Operations ```python # Inference endpoint: 10 requests per minute async def check_inference_rate_limit(request: Request): client_ip = request.client.host if request.client else "unknown" if not limiter.check_limit(f"infer:{client_ip}", max_requests=10, window_seconds=60): raise HTTPException(429, "Inference rate limit exceeded") @router.post("/infer") async def infer( file: UploadFile, _: None = Depends(check_inference_rate_limit) ): ... ``` #### Verification Steps - [ ] Rate limiting on all API endpoints - [ ] Stricter limits on expensive operations (inference, OCR) - [ ] IP-based rate limiting - [ ] Clear error messages for rate-limited requests ### 7. Sensitive Data Exposure #### Logging ```python import logging logger = logging.getLogger(__name__) # WRONG: Logging sensitive data logger.info(f"Processing invoice: {invoice_data}") # May contain sensitive info logger.error(f"DB error with password: {db_password}") # CORRECT: Redact sensitive data logger.info(f"Processing invoice: id={doc_id}") logger.error(f"DB connection failed to {db_host}:{db_port}") # CORRECT: Structured logging with safe fields only logger.info( "Invoice processed", extra={ "document_id": doc_id, "field_count": len(fields), "processing_time_ms": elapsed_ms } ) ``` #### Error Messages ```python # WRONG: Exposing internal details @app.exception_handler(Exception) async def error_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={ "error": str(exc), "traceback": traceback.format_exc() # NEVER expose! } ) # CORRECT: Generic error messages @app.exception_handler(Exception) async def error_handler(request: Request, exc: Exception): logger.error(f"Unhandled error: {exc}", exc_info=True) # Log internally return JSONResponse( status_code=500, content={"success": False, "error": "An error occurred"} ) ``` #### Verification Steps - [ ] No passwords, tokens, or secrets in logs - [ ] Error messages generic for users - [ ] Detailed errors only in server logs - [ ] No stack traces exposed to users - [ ] Invoice data (amounts, account numbers) not logged ### 8. CORS Configuration ```python from fastapi.middleware.cors import CORSMiddleware # WRONG: Allow all origins app.add_middleware( CORSMiddleware, allow_origins=["*"], # DANGEROUS in production allow_credentials=True, ) # CORRECT: Specific origins ALLOWED_ORIGINS = [ "http://localhost:8000", "https://your-domain.com", ] app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST"], allow_headers=["*"], ) ``` #### Verification Steps - [ ] CORS origins explicitly listed - [ ] No wildcard origins in production - [ ] Credentials only with specific origins ### 9. Temporary File Security ```python import tempfile from pathlib import Path from contextlib import contextmanager @contextmanager def secure_temp_file(suffix: str = ".pdf"): """Create secure temporary file that is always cleaned up.""" tmp_path = None try: with tempfile.NamedTemporaryFile( suffix=suffix, delete=False, dir="/tmp/invoice-master" # Dedicated temp directory ) as tmp: tmp_path = Path(tmp.name) yield tmp_path finally: if tmp_path and tmp_path.exists(): tmp_path.unlink() # Usage async def process_upload(file: UploadFile): with secure_temp_file(".pdf") as tmp_path: content = await validate_pdf_upload(file) tmp_path.write_bytes(content) result = pipeline.process(tmp_path) # File automatically cleaned up return result ``` #### Verification Steps - [ ] Temporary files always cleaned up (use context managers) - [ ] Temp directory has restricted permissions - [ ] No leftover files after processing errors ### 10. Dependency Security #### Regular Updates ```bash # Check for vulnerabilities pip-audit # Update dependencies pip install --upgrade -r requirements.txt # Check for outdated packages pip list --outdated ``` #### Lock Files ```bash # Create requirements lock file pip freeze > requirements.lock # Install from lock file for reproducible builds pip install -r requirements.lock ``` #### Verification Steps - [ ] Dependencies up to date - [ ] No known vulnerabilities (pip-audit clean) - [ ] requirements.txt pinned versions - [ ] Regular security updates scheduled ## Security Testing ### Automated Security Tests ```python import pytest from fastapi.testclient import TestClient def test_requires_api_key(client: TestClient): """Test authentication required.""" response = client.post("/api/v1/infer") assert response.status_code == 401 def test_invalid_api_key_rejected(client: TestClient): """Test invalid API key rejected.""" response = client.post( "/api/v1/infer", headers={"X-API-Key": "invalid-key"} ) assert response.status_code == 403 def test_sql_injection_prevented(client: TestClient): """Test SQL injection attempt rejected.""" response = client.get( "/api/v1/documents", params={"id": "'; DROP TABLE documents; --"} ) # Should return validation error, not execute SQL assert response.status_code in (400, 422) def test_path_traversal_prevented(client: TestClient): """Test path traversal attempt rejected.""" response = client.get("/api/v1/results/../../etc/passwd") assert response.status_code == 400 def test_rate_limit_enforced(client: TestClient): """Test rate limiting works.""" responses = [ client.post("/api/v1/infer", files={"file": b"test"}) for _ in range(15) ] rate_limited = [r for r in responses if r.status_code == 429] assert len(rate_limited) > 0 def test_large_file_rejected(client: TestClient): """Test file size limit enforced.""" large_content = b"x" * (11 * 1024 * 1024) # 11MB response = client.post( "/api/v1/infer", files={"file": ("test.pdf", large_content)} ) assert response.status_code == 400 ``` ## Pre-Deployment Security Checklist Before ANY production deployment: - [ ] **Secrets**: No hardcoded secrets, all in env vars - [ ] **Input Validation**: All user inputs validated with Pydantic - [ ] **SQL Injection**: All queries use parameterized queries - [ ] **Path Traversal**: File paths validated and sanitized - [ ] **Authentication**: API key or token validation - [ ] **Authorization**: Role checks in place - [ ] **Rate Limiting**: Enabled on all endpoints - [ ] **HTTPS**: Enforced in production - [ ] **CORS**: Properly configured (no wildcards) - [ ] **Error Handling**: No sensitive data in errors - [ ] **Logging**: No sensitive data logged - [ ] **File Uploads**: Validated (size, type, magic bytes) - [ ] **Temp Files**: Always cleaned up - [ ] **Dependencies**: Up to date, no vulnerabilities ## Resources - [OWASP Top 10](https://owasp.org/www-project-top-ten/) - [FastAPI Security](https://fastapi.tiangolo.com/tutorial/security/) - [Bandit (Python Security Linter)](https://bandit.readthedocs.io/) - [pip-audit](https://pypi.org/project/pip-audit/) --- **Remember**: Security is not optional. One vulnerability can compromise sensitive invoice data. When in doubt, err on the side of caution.