Add claude config

This commit is contained in:
Yaojia Wang
2026-01-25 16:17:23 +01:00
parent e599424a92
commit d5101e3604
40 changed files with 5559 additions and 1378 deletions

View File

@@ -0,0 +1,568 @@
---
name: security-review
description: Use this skill when adding authentication, handling user input, working with secrets, creating API endpoints, or implementing payment/sensitive features. Provides comprehensive security checklist and patterns.
---
# Security Review Skill
Security best practices for Python/FastAPI applications handling sensitive invoice data.
## When to Activate
- Implementing authentication or authorization
- Handling user input or file uploads
- Creating new API endpoints
- Working with secrets or credentials
- Processing sensitive invoice data
- Integrating third-party APIs
- Database operations with user data
## Security Checklist
### 1. Secrets Management
#### NEVER Do This
```python
# Hardcoded secrets - CRITICAL VULNERABILITY
api_key = "sk-proj-xxxxx"
db_password = "password123"
```
#### ALWAYS Do This
```python
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
db_password: str
api_key: str
model_path: str = "runs/train/invoice_fields/weights/best.pt"
class Config:
env_file = ".env"
settings = Settings()
# Verify secrets exist
if not settings.db_password:
raise RuntimeError("DB_PASSWORD not configured")
```
#### Verification Steps
- [ ] No hardcoded API keys, tokens, or passwords
- [ ] All secrets in environment variables
- [ ] `.env` in .gitignore
- [ ] No secrets in git history
- [ ] `.env.example` with placeholder values
### 2. Input Validation
#### Always Validate User Input
```python
from pydantic import BaseModel, Field, field_validator
from fastapi import HTTPException
import re
class InvoiceRequest(BaseModel):
invoice_number: str = Field(..., min_length=1, max_length=50)
amount: float = Field(..., gt=0, le=1_000_000)
bankgiro: str | None = None
@field_validator("invoice_number")
@classmethod
def validate_invoice_number(cls, v: str) -> str:
# Whitelist validation - only allow safe characters
if not re.match(r"^[A-Za-z0-9\-_]+$", v):
raise ValueError("Invalid invoice number format")
return v
@field_validator("bankgiro")
@classmethod
def validate_bankgiro(cls, v: str | None) -> str | None:
if v is None:
return None
cleaned = re.sub(r"[^0-9]", "", v)
if not (7 <= len(cleaned) <= 8):
raise ValueError("Bankgiro must be 7-8 digits")
return cleaned
```
#### File Upload Validation
```python
from fastapi import UploadFile, HTTPException
from pathlib import Path
ALLOWED_EXTENSIONS = {".pdf"}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
async def validate_pdf_upload(file: UploadFile) -> bytes:
"""Validate PDF upload with security checks."""
# Extension check
ext = Path(file.filename or "").suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"Only PDF files allowed, got {ext}")
# Read content
content = await file.read()
# Size check
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"File too large (max {MAX_FILE_SIZE // 1024 // 1024}MB)")
# Magic bytes check (PDF signature)
if not content.startswith(b"%PDF"):
raise HTTPException(400, "Invalid PDF file format")
return content
```
#### Verification Steps
- [ ] All user inputs validated with Pydantic
- [ ] File uploads restricted (size, type, extension, magic bytes)
- [ ] No direct use of user input in queries
- [ ] Whitelist validation (not blacklist)
- [ ] Error messages don't leak sensitive info
### 3. SQL Injection Prevention
#### NEVER Concatenate SQL
```python
# DANGEROUS - SQL Injection vulnerability
query = f"SELECT * FROM documents WHERE id = '{user_input}'"
cur.execute(query)
```
#### ALWAYS Use Parameterized Queries
```python
import psycopg2
# Safe - parameterized query with %s placeholders
cur.execute(
"SELECT * FROM documents WHERE id = %s AND status = %s",
(document_id, status)
)
# Safe - named parameters
cur.execute(
"SELECT * FROM documents WHERE id = %(id)s",
{"id": document_id}
)
# Safe - psycopg2.sql for dynamic identifiers
from psycopg2 import sql
cur.execute(
sql.SQL("SELECT {} FROM {} WHERE id = %s").format(
sql.Identifier("invoice_number"),
sql.Identifier("documents")
),
(document_id,)
)
```
#### Verification Steps
- [ ] All database queries use parameterized queries (%s or %(name)s)
- [ ] No string concatenation or f-strings in SQL
- [ ] psycopg2.sql module used for dynamic identifiers
- [ ] No user input in table/column names
### 4. Path Traversal Prevention
#### NEVER Trust User Paths
```python
# DANGEROUS - Path traversal vulnerability
filename = request.query_params.get("file")
with open(f"/data/{filename}", "r") as f: # Attacker: ../../../etc/passwd
return f.read()
```
#### ALWAYS Validate Paths
```python
from pathlib import Path
ALLOWED_DIR = Path("/data/uploads").resolve()
def get_safe_path(filename: str) -> Path:
"""Get safe file path, preventing path traversal."""
# Remove any path components
safe_name = Path(filename).name
# Validate filename characters
if not re.match(r"^[A-Za-z0-9_\-\.]+$", safe_name):
raise HTTPException(400, "Invalid filename")
# Resolve and verify within allowed directory
full_path = (ALLOWED_DIR / safe_name).resolve()
if not full_path.is_relative_to(ALLOWED_DIR):
raise HTTPException(400, "Invalid file path")
return full_path
```
#### Verification Steps
- [ ] User-provided filenames sanitized
- [ ] Paths resolved and validated against allowed directory
- [ ] No direct concatenation of user input into paths
- [ ] Whitelist characters in filenames
### 5. Authentication & Authorization
#### API Key Validation
```python
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
if not api_key:
raise HTTPException(401, "API key required")
# Constant-time comparison to prevent timing attacks
import hmac
if not hmac.compare_digest(api_key, settings.api_key):
raise HTTPException(403, "Invalid API key")
return api_key
@router.post("/infer")
async def infer(
file: UploadFile,
api_key: str = Depends(verify_api_key)
):
...
```
#### Role-Based Access Control
```python
from enum import Enum
class UserRole(str, Enum):
USER = "user"
ADMIN = "admin"
def require_role(required_role: UserRole):
async def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role != required_role:
raise HTTPException(403, "Insufficient permissions")
return current_user
return role_checker
@router.delete("/documents/{doc_id}")
async def delete_document(
doc_id: str,
user: User = Depends(require_role(UserRole.ADMIN))
):
...
```
#### Verification Steps
- [ ] API keys validated with constant-time comparison
- [ ] Authorization checks before sensitive operations
- [ ] Role-based access control implemented
- [ ] Session/token validation on protected routes
### 6. Rate Limiting
#### Rate Limiter Implementation
```python
from time import time
from collections import defaultdict
from fastapi import Request, HTTPException
class RateLimiter:
def __init__(self):
self.requests: dict[str, list[float]] = defaultdict(list)
def check_limit(
self,
identifier: str,
max_requests: int,
window_seconds: int
) -> bool:
now = time()
# Clean old requests
self.requests[identifier] = [
t for t in self.requests[identifier]
if now - t < window_seconds
]
# Check limit
if len(self.requests[identifier]) >= max_requests:
return False
self.requests[identifier].append(now)
return True
limiter = RateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host if request.client else "unknown"
# 100 requests per minute for general endpoints
if not limiter.check_limit(client_ip, max_requests=100, window_seconds=60):
raise HTTPException(429, "Rate limit exceeded. Try again later.")
return await call_next(request)
```
#### Stricter Limits for Expensive Operations
```python
# Inference endpoint: 10 requests per minute
async def check_inference_rate_limit(request: Request):
client_ip = request.client.host if request.client else "unknown"
if not limiter.check_limit(f"infer:{client_ip}", max_requests=10, window_seconds=60):
raise HTTPException(429, "Inference rate limit exceeded")
@router.post("/infer")
async def infer(
file: UploadFile,
_: None = Depends(check_inference_rate_limit)
):
...
```
#### Verification Steps
- [ ] Rate limiting on all API endpoints
- [ ] Stricter limits on expensive operations (inference, OCR)
- [ ] IP-based rate limiting
- [ ] Clear error messages for rate-limited requests
### 7. Sensitive Data Exposure
#### Logging
```python
import logging
logger = logging.getLogger(__name__)
# WRONG: Logging sensitive data
logger.info(f"Processing invoice: {invoice_data}") # May contain sensitive info
logger.error(f"DB error with password: {db_password}")
# CORRECT: Redact sensitive data
logger.info(f"Processing invoice: id={doc_id}")
logger.error(f"DB connection failed to {db_host}:{db_port}")
# CORRECT: Structured logging with safe fields only
logger.info(
"Invoice processed",
extra={
"document_id": doc_id,
"field_count": len(fields),
"processing_time_ms": elapsed_ms
}
)
```
#### Error Messages
```python
# WRONG: Exposing internal details
@app.exception_handler(Exception)
async def error_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={
"error": str(exc),
"traceback": traceback.format_exc() # NEVER expose!
}
)
# CORRECT: Generic error messages
@app.exception_handler(Exception)
async def error_handler(request: Request, exc: Exception):
logger.error(f"Unhandled error: {exc}", exc_info=True) # Log internally
return JSONResponse(
status_code=500,
content={"success": False, "error": "An error occurred"}
)
```
#### Verification Steps
- [ ] No passwords, tokens, or secrets in logs
- [ ] Error messages generic for users
- [ ] Detailed errors only in server logs
- [ ] No stack traces exposed to users
- [ ] Invoice data (amounts, account numbers) not logged
### 8. CORS Configuration
```python
from fastapi.middleware.cors import CORSMiddleware
# WRONG: Allow all origins
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # DANGEROUS in production
allow_credentials=True,
)
# CORRECT: Specific origins
ALLOWED_ORIGINS = [
"http://localhost:8000",
"https://your-domain.com",
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
```
#### Verification Steps
- [ ] CORS origins explicitly listed
- [ ] No wildcard origins in production
- [ ] Credentials only with specific origins
### 9. Temporary File Security
```python
import tempfile
from pathlib import Path
from contextlib import contextmanager
@contextmanager
def secure_temp_file(suffix: str = ".pdf"):
"""Create secure temporary file that is always cleaned up."""
tmp_path = None
try:
with tempfile.NamedTemporaryFile(
suffix=suffix,
delete=False,
dir="/tmp/invoice-master" # Dedicated temp directory
) as tmp:
tmp_path = Path(tmp.name)
yield tmp_path
finally:
if tmp_path and tmp_path.exists():
tmp_path.unlink()
# Usage
async def process_upload(file: UploadFile):
with secure_temp_file(".pdf") as tmp_path:
content = await validate_pdf_upload(file)
tmp_path.write_bytes(content)
result = pipeline.process(tmp_path)
# File automatically cleaned up
return result
```
#### Verification Steps
- [ ] Temporary files always cleaned up (use context managers)
- [ ] Temp directory has restricted permissions
- [ ] No leftover files after processing errors
### 10. Dependency Security
#### Regular Updates
```bash
# Check for vulnerabilities
pip-audit
# Update dependencies
pip install --upgrade -r requirements.txt
# Check for outdated packages
pip list --outdated
```
#### Lock Files
```bash
# Create requirements lock file
pip freeze > requirements.lock
# Install from lock file for reproducible builds
pip install -r requirements.lock
```
#### Verification Steps
- [ ] Dependencies up to date
- [ ] No known vulnerabilities (pip-audit clean)
- [ ] requirements.txt pinned versions
- [ ] Regular security updates scheduled
## Security Testing
### Automated Security Tests
```python
import pytest
from fastapi.testclient import TestClient
def test_requires_api_key(client: TestClient):
"""Test authentication required."""
response = client.post("/api/v1/infer")
assert response.status_code == 401
def test_invalid_api_key_rejected(client: TestClient):
"""Test invalid API key rejected."""
response = client.post(
"/api/v1/infer",
headers={"X-API-Key": "invalid-key"}
)
assert response.status_code == 403
def test_sql_injection_prevented(client: TestClient):
"""Test SQL injection attempt rejected."""
response = client.get(
"/api/v1/documents",
params={"id": "'; DROP TABLE documents; --"}
)
# Should return validation error, not execute SQL
assert response.status_code in (400, 422)
def test_path_traversal_prevented(client: TestClient):
"""Test path traversal attempt rejected."""
response = client.get("/api/v1/results/../../etc/passwd")
assert response.status_code == 400
def test_rate_limit_enforced(client: TestClient):
"""Test rate limiting works."""
responses = [
client.post("/api/v1/infer", files={"file": b"test"})
for _ in range(15)
]
rate_limited = [r for r in responses if r.status_code == 429]
assert len(rate_limited) > 0
def test_large_file_rejected(client: TestClient):
"""Test file size limit enforced."""
large_content = b"x" * (11 * 1024 * 1024) # 11MB
response = client.post(
"/api/v1/infer",
files={"file": ("test.pdf", large_content)}
)
assert response.status_code == 400
```
## Pre-Deployment Security Checklist
Before ANY production deployment:
- [ ] **Secrets**: No hardcoded secrets, all in env vars
- [ ] **Input Validation**: All user inputs validated with Pydantic
- [ ] **SQL Injection**: All queries use parameterized queries
- [ ] **Path Traversal**: File paths validated and sanitized
- [ ] **Authentication**: API key or token validation
- [ ] **Authorization**: Role checks in place
- [ ] **Rate Limiting**: Enabled on all endpoints
- [ ] **HTTPS**: Enforced in production
- [ ] **CORS**: Properly configured (no wildcards)
- [ ] **Error Handling**: No sensitive data in errors
- [ ] **Logging**: No sensitive data logged
- [ ] **File Uploads**: Validated (size, type, magic bytes)
- [ ] **Temp Files**: Always cleaned up
- [ ] **Dependencies**: Up to date, no vulnerabilities
## Resources
- [OWASP Top 10](https://owasp.org/www-project-top-ten/)
- [FastAPI Security](https://fastapi.tiangolo.com/tutorial/security/)
- [Bandit (Python Security Linter)](https://bandit.readthedocs.io/)
- [pip-audit](https://pypi.org/project/pip-audit/)
---
**Remember**: Security is not optional. One vulnerability can compromise sensitive invoice data. When in doubt, err on the side of caution.