Fix the skill

This commit is contained in:
Yaojia Wang
2026-02-04 23:30:06 +01:00
parent fa996683c3
commit 15533285c6
4 changed files with 788 additions and 1730 deletions

View File

@@ -1,314 +1,274 @@
# Backend Development Patterns # .NET Development Best Practices
Backend architecture patterns for Python/FastAPI/PostgreSQL applications. ## Project Structure
## API Design
### RESTful Structure
``` ```
GET /api/v1/documents # List src/
GET /api/v1/documents/{id} # Get Domain/ # Entities, value objects, domain events
POST /api/v1/documents # Create Application/ # Use cases, DTOs, interfaces
PUT /api/v1/documents/{id} # Replace Infrastructure/ # EF Core, external services
PATCH /api/v1/documents/{id} # Update Api/ # Controllers, middleware, filters
DELETE /api/v1/documents/{id} # Delete tests/
Unit/
GET /api/v1/documents?status=processed&sort=created_at&limit=20&offset=0 Integration/
``` ```
### FastAPI Route Pattern ## Code Style
```python ```csharp
from fastapi import APIRouter, HTTPException, Depends, Query, File, UploadFile // Use records for DTOs and value objects
from pydantic import BaseModel public sealed record CreateDocumentRequest(string Name, string Type);
router = APIRouter(prefix="/api/v1", tags=["inference"]) // Use primary constructors
public class DocumentService(IRepository<Document> repo, ILogger<DocumentService> logger)
{
public async Task<Document?> GetAsync(Guid id, CancellationToken ct) =>
await repo.GetByIdAsync(id, ct);
}
@router.post("/infer", response_model=ApiResponse[InferenceResult]) // Prefer expression body for simple methods
async def infer_document( public Document? FindById(Guid id) => _documents.FirstOrDefault(d => d.Id == id);
file: UploadFile = File(...),
confidence_threshold: float = Query(0.5, ge=0, le=1), // Use collection expressions
service: InferenceService = Depends(get_inference_service) int[] numbers = [1, 2, 3];
) -> ApiResponse[InferenceResult]: List<string> names = ["Alice", "Bob"];
result = await service.process(file, confidence_threshold)
return ApiResponse(success=True, data=result)
``` ```
### Consistent Response Schema ## Async/Await
```python ```csharp
from typing import Generic, TypeVar // Always pass CancellationToken
T = TypeVar('T') public async Task<Document> GetAsync(Guid id, CancellationToken ct)
class ApiResponse(BaseModel, Generic[T]): // Use ConfigureAwait(false) in libraries
success: bool await _httpClient.GetAsync(url, ct).ConfigureAwait(false);
data: T | None = None
error: str | None = None // Avoid async void (except event handlers)
meta: dict | None = None public async Task ProcessAsync() { } // Good
public async void Process() { } // Bad
// Use ValueTask for hot paths with frequent sync completion
public ValueTask<int> GetCachedCountAsync()
``` ```
## Core Patterns ## Dependency Injection
### Repository Pattern ```csharp
// Register by interface
builder.Services.AddScoped<IDocumentService, DocumentService>();
```python // Use Options pattern for configuration
from typing import Protocol builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("App"));
class DocumentRepository(Protocol): public class MyService(IOptions<AppSettings> options)
def find_all(self, filters: dict | None = None) -> list[Document]: ... {
def find_by_id(self, id: str) -> Document | None: ... private readonly AppSettings _settings = options.Value;
def create(self, data: dict) -> Document: ... }
def update(self, id: str, data: dict) -> Document: ...
def delete(self, id: str) -> None: ... // Avoid service locator pattern
// Bad: var service = serviceProvider.GetService<IMyService>();
// Good: Constructor injection
``` ```
### Service Layer ## Entity Framework Core
```python ```csharp
class InferenceService: // Always use AsNoTracking for read-only queries
def __init__(self, model_path: str, use_gpu: bool = True): await _context.Documents.AsNoTracking().ToListAsync(ct);
self.pipeline = InferencePipeline(model_path=model_path, use_gpu=use_gpu)
async def process(self, file: UploadFile, confidence_threshold: float) -> InferenceResult: // Use projection to select only needed fields
temp_path = self._save_temp_file(file) await _context.Documents
try: .Where(d => d.Status == "Active")
return self.pipeline.process_pdf(temp_path) .Select(d => new DocumentDto(d.Id, d.Name))
finally: .ToListAsync(ct);
temp_path.unlink(missing_ok=True)
```
### Dependency Injection // Prevent N+1 with Include or projection
await _context.Documents.Include(d => d.Labels).ToListAsync(ct);
```python // Use explicit transactions for multiple operations
from functools import lru_cache await using var tx = await _context.Database.BeginTransactionAsync(ct);
from pydantic_settings import BaseSettings
class Settings(BaseSettings): // Configure entities with IEntityTypeConfiguration
db_host: str = "localhost" public class DocumentConfiguration : IEntityTypeConfiguration<Document>
db_password: str {
model_path: str = "runs/train/invoice_fields/weights/best.pt" public void Configure(EntityTypeBuilder<Document> builder)
class Config: {
env_file = ".env" builder.HasKey(d => d.Id);
builder.Property(d => d.Name).HasMaxLength(200).IsRequired();
@lru_cache() builder.HasIndex(d => d.Status);
def get_settings() -> Settings: }
return Settings() }
def get_inference_service(settings: Settings = Depends(get_settings)) -> InferenceService:
return InferenceService(model_path=settings.model_path)
```
## Database Patterns
### Connection Pooling
```python
from psycopg2 import pool
from contextlib import contextmanager
db_pool = pool.ThreadedConnectionPool(minconn=2, maxconn=10, **db_config)
@contextmanager
def get_db_connection():
conn = db_pool.getconn()
try:
yield conn
finally:
db_pool.putconn(conn)
```
### Query Optimization
```python
# GOOD: Select only needed columns
cur.execute("""
SELECT id, status, fields->>'InvoiceNumber' as invoice_number
FROM documents WHERE status = %s
ORDER BY created_at DESC LIMIT %s
""", ('processed', 10))
# BAD: SELECT * FROM documents
```
### N+1 Prevention
```python
# BAD: N+1 queries
for doc in documents:
doc.labels = get_labels(doc.id) # N queries
# GOOD: Batch fetch with JOIN
cur.execute("""
SELECT d.id, d.status, array_agg(l.label) as labels
FROM documents d
LEFT JOIN document_labels l ON d.id = l.document_id
GROUP BY d.id, d.status
""")
```
### Transaction Pattern
```python
def create_document_with_labels(doc_data: dict, labels: list[dict]) -> str:
with get_db_connection() as conn:
try:
with conn.cursor() as cur:
cur.execute("INSERT INTO documents ... RETURNING id", ...)
doc_id = cur.fetchone()[0]
for label in labels:
cur.execute("INSERT INTO document_labels ...", ...)
conn.commit()
return doc_id
except Exception:
conn.rollback()
raise
```
## Caching
```python
from cachetools import TTLCache
_cache = TTLCache(maxsize=1000, ttl=300)
def get_document_cached(doc_id: str) -> Document | None:
if doc_id in _cache:
return _cache[doc_id]
doc = repo.find_by_id(doc_id)
if doc:
_cache[doc_id] = doc
return doc
``` ```
## Error Handling ## Error Handling
### Exception Hierarchy ```csharp
// Create domain-specific exceptions
public class NotFoundException(string resource, Guid id)
: Exception($"{resource} not found: {id}");
```python // Use global exception handler
class AppError(Exception): public class GlobalExceptionHandler(ILogger<GlobalExceptionHandler> logger) : IExceptionHandler
def __init__(self, message: str, status_code: int = 500): {
self.message = message public async ValueTask<bool> TryHandleAsync(HttpContext ctx, Exception ex, CancellationToken ct)
self.status_code = status_code {
logger.LogError(ex, "Error: {Message}", ex.Message);
ctx.Response.StatusCode = ex is NotFoundException ? 404 : 500;
await ctx.Response.WriteAsJsonAsync(new { error = ex.Message }, ct);
return true;
}
}
class NotFoundError(AppError): // Use Result pattern for expected failures
def __init__(self, resource: str, id: str): public Result<Document> Validate(CreateRequest request) =>
super().__init__(f"{resource} not found: {id}", 404) string.IsNullOrEmpty(request.Name)
? Result<Document>.Fail("Name is required")
class ValidationError(AppError): : Result<Document>.Ok(new Document(request.Name));
def __init__(self, message: str):
super().__init__(message, 400)
``` ```
### FastAPI Exception Handler ## Validation
```python ```csharp
@app.exception_handler(AppError) // Use FluentValidation
async def app_error_handler(request: Request, exc: AppError): public class CreateDocumentValidator : AbstractValidator<CreateDocumentRequest>
return JSONResponse(status_code=exc.status_code, content={"success": False, "error": exc.message}) {
public CreateDocumentValidator()
{
RuleFor(x => x.Name).NotEmpty().MaximumLength(200);
RuleFor(x => x.Type).Must(BeValidType).WithMessage("Invalid document type");
}
}
@app.exception_handler(Exception) // Or use Data Annotations for simple cases
async def generic_error_handler(request: Request, exc: Exception): public record CreateRequest(
logger.error(f"Unexpected error: {exc}", exc_info=True) [Required, MaxLength(200)] string Name,
return JSONResponse(status_code=500, content={"success": False, "error": "Internal server error"}) [Range(1, 100)] int Quantity);
``` ```
### Retry with Backoff ## Logging
```python ```csharp
async def retry_with_backoff(fn, max_retries: int = 3, base_delay: float = 1.0): // Use structured logging with templates
last_error = None logger.LogInformation("Processing document {DocumentId} for user {UserId}", docId, userId);
for attempt in range(max_retries):
try: // Use appropriate log levels
return await fn() if asyncio.iscoroutinefunction(fn) else fn() logger.LogDebug("Cache hit for key {Key}", key); // Development details
except Exception as e: logger.LogInformation("Document {Id} created", id); // Normal operations
last_error = e logger.LogWarning("Retry attempt {Attempt} for {Op}", n, op); // Potential issues
if attempt < max_retries - 1: logger.LogError(ex, "Failed to process {DocumentId}", id); // Errors
await asyncio.sleep(base_delay * (2 ** attempt))
raise last_error // Configure log filtering in appsettings
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Microsoft.EntityFrameworkCore": "Warning"
}
}
}
``` ```
## Rate Limiting ## API Design
```python ```csharp
from time import time [ApiController]
from collections import defaultdict [Route("api/v1/[controller]")]
public class DocumentsController(IDocumentService service) : ControllerBase
{
[HttpGet("{id:guid}")]
[ProducesResponseType<Document>(200)]
[ProducesResponseType(404)]
public async Task<IActionResult> Get(Guid id, CancellationToken ct)
{
var doc = await service.GetAsync(id, ct);
return doc is null ? NotFound() : Ok(doc);
}
class RateLimiter: [HttpPost]
def __init__(self): public async Task<IActionResult> Create(CreateRequest request, CancellationToken ct)
self.requests: dict[str, list[float]] = defaultdict(list) {
var doc = await service.CreateAsync(request, ct);
def check_limit(self, identifier: str, max_requests: int, window_sec: int) -> bool: return CreatedAtAction(nameof(Get), new { id = doc.Id }, doc);
now = time() }
self.requests[identifier] = [t for t in self.requests[identifier] if now - t < window_sec] }
if len(self.requests[identifier]) >= max_requests:
return False
self.requests[identifier].append(now)
return True
limiter = RateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
ip = request.client.host
if not limiter.check_limit(ip, max_requests=100, window_sec=60):
return JSONResponse(status_code=429, content={"error": "Rate limit exceeded"})
return await call_next(request)
``` ```
## Logging & Middleware ## Testing
### Request Logging ```csharp
// Use descriptive test names
[Fact]
public async Task GetById_WithValidId_ReturnsDocument()
{
// Arrange
var repo = Substitute.For<IRepository<Document>>();
repo.GetByIdAsync(Arg.Any<Guid>(), Arg.Any<CancellationToken>())
.Returns(new Document("Test"));
var service = new DocumentService(repo);
```python // Act
@app.middleware("http") var result = await service.GetAsync(Guid.NewGuid(), CancellationToken.None);
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())[:8] // Assert
start_time = time.time() result.Should().NotBeNull();
logger.info(f"[{request_id}] {request.method} {request.url.path}") result!.Name.Should().Be("Test");
response = await call_next(request) }
duration_ms = (time.time() - start_time) * 1000
logger.info(f"[{request_id}] Completed {response.status_code} in {duration_ms:.2f}ms") // Use WebApplicationFactory for integration tests
return response public class ApiTests(WebApplicationFactory<Program> factory) : IClassFixture<WebApplicationFactory<Program>>
{
[Fact]
public async Task GetDocuments_ReturnsSuccess()
{
var client = factory.CreateClient();
var response = await client.GetAsync("/api/v1/documents");
response.StatusCode.Should().Be(HttpStatusCode.OK);
}
}
``` ```
### Structured Logging ## Performance
```python ```csharp
class JSONFormatter(logging.Formatter): // Use IMemoryCache for frequently accessed data
def format(self, record): public class CachedService(IMemoryCache cache, IRepository<Document> repo)
return json.dumps({ {
"timestamp": datetime.utcnow().isoformat(), public async Task<Document?> GetAsync(Guid id, CancellationToken ct) =>
"level": record.levelname, await cache.GetOrCreateAsync($"doc:{id}", async entry =>
"message": record.getMessage(), {
"module": record.module, entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(5);
}) return await repo.GetByIdAsync(id, ct);
});
}
// Use pagination for large collections
public async Task<PagedResult<Document>> GetPagedAsync(int page, int size, CancellationToken ct) =>
new(
await _context.Documents.Skip((page - 1) * size).Take(size).ToListAsync(ct),
await _context.Documents.CountAsync(ct)
);
// Use IAsyncEnumerable for streaming large datasets
public async IAsyncEnumerable<Document> StreamAllAsync([EnumeratorCancellation] CancellationToken ct)
{
await foreach (var doc in _context.Documents.AsAsyncEnumerable().WithCancellation(ct))
yield return doc;
}
``` ```
## Background Tasks ## Security
```python ```csharp
from fastapi import BackgroundTasks // Never hardcode secrets
var apiKey = builder.Configuration["ApiKey"]; // From environment/secrets
def send_notification(document_id: str, status: str): // Use parameterized queries (EF Core does this automatically)
logger.info(f"Notification: {document_id} -> {status}") // Bad: $"SELECT * FROM Users WHERE Id = {id}"
// Good: _context.Users.Where(u => u.Id == id)
@router.post("/infer") // Validate and sanitize all inputs
async def infer(file: UploadFile, background_tasks: BackgroundTasks): // Use HTTPS in production
result = await process_document(file) // Implement rate limiting
background_tasks.add_task(send_notification, result.document_id, "completed") builder.Services.AddRateLimiter(options => { ... });
return result
``` ```
## Key Principles
- Repository pattern: Abstract data access
- Service layer: Business logic separated from routes
- Dependency injection via `Depends()`
- Connection pooling for database
- Parameterized queries only (no f-strings in SQL)
- Batch fetch to prevent N+1
- Consistent `ApiResponse[T]` format
- Exception hierarchy with proper status codes
- Rate limit by IP
- Structured logging with request ID

View File

@@ -1,665 +1,234 @@
--- ---
name: coding-standards name: coding-standards
description: Universal coding standards, best practices, and patterns for Python, FastAPI, and data processing development. description: .NET/C# coding standards and best practices.
--- ---
# Coding Standards & Best Practices # .NET Coding Standards
Python coding standards for the Invoice Master project. ## Core Principles
## Code Quality Principles - **Readability First** - Clear names, self-documenting code
- **KISS** - Simplest solution that works
- **DRY** - Extract common logic, avoid copy-paste
- **YAGNI** - Don't build features before needed
### 1. Readability First ## Naming Conventions
- Code is read more than written
- Clear variable and function names
- Self-documenting code preferred over comments
- Consistent formatting (follow PEP 8)
### 2. KISS (Keep It Simple, Stupid) ```csharp
- Simplest solution that works // PascalCase: Types, methods, properties, public fields
- Avoid over-engineering public class DocumentService { }
- No premature optimization public async Task<Document> GetByIdAsync(Guid id) { }
- Easy to understand > clever code public string InvoiceNumber { get; init; }
### 3. DRY (Don't Repeat Yourself) // camelCase: Parameters, local variables, private fields with underscore
- Extract common logic into functions private readonly ILogger<DocumentService> _logger;
- Create reusable utilities public void Process(string documentId, int pageCount) { }
- Share modules across the codebase
- Avoid copy-paste programming
### 4. YAGNI (You Aren't Gonna Need It) // Interfaces: I prefix
- Don't build features before they're needed public interface IDocumentRepository { }
- Avoid speculative generality
- Add complexity only when required
- Start simple, refactor when needed
## Python Standards // Async methods: Async suffix
public async Task<Document> LoadAsync(CancellationToken ct)
### Variable Naming
```python
# GOOD: Descriptive names
invoice_number = "INV-2024-001"
is_valid_document = True
total_confidence_score = 0.95
# BAD: Unclear names
inv = "INV-2024-001"
flag = True
x = 0.95
``` ```
### Function Naming ## Modern C# Features
```python ```csharp
# GOOD: Verb-noun pattern with type hints // Records for DTOs and value objects
def extract_invoice_fields(pdf_path: Path) -> dict[str, str]: public sealed record CreateDocumentRequest(string Name, string Type);
"""Extract fields from invoice PDF.""" public sealed record DocumentDto(Guid Id, string Name, DateTime CreatedAt);
...
def calculate_confidence(predictions: list[float]) -> float: // Primary constructors
"""Calculate average confidence score.""" public class DocumentService(IRepository<Document> repo, ILogger<DocumentService> logger)
... {
public async Task<Document?> GetAsync(Guid id, CancellationToken ct) =>
await repo.GetByIdAsync(id, ct);
}
def is_valid_bankgiro(value: str) -> bool: // Pattern matching
"""Check if value is valid Bankgiro number.""" var message = result switch
... {
{ IsSuccess: true, Value: var doc } => $"Found: {doc.Name}",
{ Error: var err } => $"Error: {err}",
_ => "Unknown"
};
# BAD: Unclear or noun-only // Collection expressions
def invoice(path): int[] numbers = [1, 2, 3];
... List<string> names = ["Alice", "Bob"];
def confidence(p): // Null coalescing
... var name = user?.Name ?? "Unknown";
list ??= [];
def bankgiro(v):
...
``` ```
### Type Hints (REQUIRED) ## Immutability (Critical)
```python ```csharp
# GOOD: Full type annotations // GOOD: Create new objects
from typing import Optional public record User(string Name, int Age)
from pathlib import Path {
from dataclasses import dataclass public User WithName(string newName) => this with { Name = newName };
}
@dataclass // GOOD: Immutable collections
class InferenceResult: public IReadOnlyList<string> GetNames() => _names.AsReadOnly();
document_id: str
fields: dict[str, str]
confidence: dict[str, float]
processing_time_ms: float
def process_document( // BAD: Mutation
pdf_path: Path, public void UpdateUser(User user, string name)
confidence_threshold: float = 0.5 {
) -> InferenceResult: user.Name = name; // MUTATION!
"""Process PDF and return extracted fields.""" }
...
# BAD: No type hints
def process_document(pdf_path, confidence_threshold=0.5):
...
``` ```
### Immutability Pattern (CRITICAL) ## Error Handling
```python ```csharp
# GOOD: Create new objects, don't mutate // Domain exceptions
def update_fields(fields: dict[str, str], updates: dict[str, str]) -> dict[str, str]: public class NotFoundException(string resource, Guid id)
return {**fields, **updates} : Exception($"{resource} not found: {id}");
def add_item(items: list[str], new_item: str) -> list[str]: // Comprehensive handling
return [*items, new_item] public async Task<Document> LoadAsync(Guid id, CancellationToken ct)
{
try
{
var doc = await _repo.GetByIdAsync(id, ct);
return doc ?? throw new NotFoundException("Document", id);
}
catch (Exception ex) when (ex is not NotFoundException)
{
_logger.LogError(ex, "Failed to load document {Id}", id);
throw;
}
}
# BAD: Direct mutation // Result pattern for expected failures
def update_fields(fields: dict[str, str], updates: dict[str, str]) -> dict[str, str]: public Result<Document> Validate(CreateRequest request) =>
fields.update(updates) # MUTATION! string.IsNullOrEmpty(request.Name)
return fields ? Result<Document>.Fail("Name required")
: Result<Document>.Ok(new Document(request.Name));
def add_item(items: list[str], new_item: str) -> list[str]:
items.append(new_item) # MUTATION!
return items
``` ```
### Error Handling ## Async/Await
```python ```csharp
import logging // Always pass CancellationToken
public async Task<Document> GetAsync(Guid id, CancellationToken ct)
logger = logging.getLogger(__name__) // Use ConfigureAwait(false) in libraries
await _client.GetAsync(url, ct).ConfigureAwait(false);
# GOOD: Comprehensive error handling with logging // Avoid async void
def load_model(model_path: Path) -> Model: public async Task ProcessAsync() { } // Good
"""Load YOLO model from path.""" public async void Process() { } // Bad
try:
if not model_path.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
model = YOLO(str(model_path)) // Parallel when independent
logger.info(f"Model loaded: {model_path}") var tasks = ids.Select(id => GetAsync(id, ct));
return model var results = await Task.WhenAll(tasks);
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise RuntimeError(f"Model loading failed: {model_path}") from e
# BAD: No error handling
def load_model(model_path):
return YOLO(str(model_path))
# BAD: Bare except
def load_model(model_path):
try:
return YOLO(str(model_path))
except: # Never use bare except!
return None
``` ```
### Async Best Practices ## LINQ Best Practices
```python ```csharp
import asyncio // Prefer method syntax for complex queries
var result = documents
.Where(d => d.Status == "Active")
.OrderByDescending(d => d.CreatedAt)
.Select(d => new DocumentDto(d.Id, d.Name, d.CreatedAt))
.Take(10);
# GOOD: Parallel execution when possible // Use Any() instead of Count() > 0
async def process_batch(pdf_paths: list[Path]) -> list[InferenceResult]: if (documents.Any(d => d.IsValid)) { }
tasks = [process_document(path) for path in pdf_paths]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions // Avoid multiple enumerations
valid_results = [] var list = documents.ToList(); // Materialize once
for path, result in zip(pdf_paths, results): var count = list.Count;
if isinstance(result, Exception): var first = list.FirstOrDefault();
logger.error(f"Failed to process {path}: {result}")
else:
valid_results.append(result)
return valid_results
# BAD: Sequential when unnecessary
async def process_batch(pdf_paths: list[Path]) -> list[InferenceResult]:
results = []
for path in pdf_paths:
result = await process_document(path)
results.append(result)
return results
```
### Context Managers
```python
from contextlib import contextmanager
from pathlib import Path
import tempfile
# GOOD: Proper resource management
@contextmanager
def temp_pdf_copy(pdf_path: Path):
"""Create temporary copy of PDF for processing."""
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp.write(pdf_path.read_bytes())
tmp_path = Path(tmp.name)
try:
yield tmp_path
finally:
tmp_path.unlink(missing_ok=True)
# Usage
with temp_pdf_copy(original_pdf) as tmp_pdf:
result = process_pdf(tmp_pdf)
```
## FastAPI Best Practices
### Route Structure
```python
from fastapi import APIRouter, HTTPException, Depends, Query, File, UploadFile
from pydantic import BaseModel
router = APIRouter(prefix="/api/v1", tags=["inference"])
class InferenceResponse(BaseModel):
success: bool
document_id: str
fields: dict[str, str]
confidence: dict[str, float]
processing_time_ms: float
@router.post("/infer", response_model=InferenceResponse)
async def infer_document(
file: UploadFile = File(...),
confidence_threshold: float = Query(0.5, ge=0.0, le=1.0)
) -> InferenceResponse:
"""Process invoice PDF and extract fields."""
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files accepted")
result = await inference_service.process(file, confidence_threshold)
return InferenceResponse(
success=True,
document_id=result.document_id,
fields=result.fields,
confidence=result.confidence,
processing_time_ms=result.processing_time_ms
)
```
### Input Validation with Pydantic
```python
from pydantic import BaseModel, Field, field_validator
from datetime import date
import re
class InvoiceData(BaseModel):
invoice_number: str = Field(..., min_length=1, max_length=50)
invoice_date: date
amount: float = Field(..., gt=0)
bankgiro: str | None = None
ocr_number: str | None = None
@field_validator("bankgiro")
@classmethod
def validate_bankgiro(cls, v: str | None) -> str | None:
if v is None:
return None
# Bankgiro: 7-8 digits
cleaned = re.sub(r"[^0-9]", "", v)
if not (7 <= len(cleaned) <= 8):
raise ValueError("Bankgiro must be 7-8 digits")
return cleaned
@field_validator("ocr_number")
@classmethod
def validate_ocr(cls, v: str | None) -> str | None:
if v is None:
return None
# OCR: 2-25 digits
cleaned = re.sub(r"[^0-9]", "", v)
if not (2 <= len(cleaned) <= 25):
raise ValueError("OCR must be 2-25 digits")
return cleaned
```
### Response Format
```python
from pydantic import BaseModel
from typing import Generic, TypeVar
T = TypeVar("T")
class ApiResponse(BaseModel, Generic[T]):
success: bool
data: T | None = None
error: str | None = None
meta: dict | None = None
# Success response
return ApiResponse(
success=True,
data=result,
meta={"processing_time_ms": elapsed_ms}
)
# Error response
return ApiResponse(
success=False,
error="Invalid PDF format"
)
``` ```
## File Organization ## File Organization
### Project Structure
``` ```
src/ src/
├── cli/ # Command-line interfaces Domain/ # Entities, value objects
│ ├── autolabel.py Application/ # Use cases, DTOs, interfaces
├── train.py Infrastructure/ # EF Core, external services
│ └── infer.py Api/ # Controllers, middleware
├── pdf/ # PDF processing tests/
├── extractor.py Unit/
└── renderer.py Integration/
├── ocr/ # OCR processing
│ ├── paddle_ocr.py
│ └── machine_code_parser.py
├── inference/ # Inference pipeline
│ ├── pipeline.py
│ ├── yolo_detector.py
│ └── field_extractor.py
├── normalize/ # Field normalization
│ ├── base.py
│ ├── date_normalizer.py
│ └── amount_normalizer.py
├── web/ # FastAPI application
│ ├── app.py
│ ├── routes.py
│ ├── services.py
│ └── schemas.py
└── utils/ # Shared utilities
├── validators.py
├── text_cleaner.py
└── logging.py
tests/ # Mirror of src structure
├── test_pdf/
├── test_ocr/
└── test_inference/
``` ```
### File Naming **Guidelines:**
- Max 800 lines per file (typical 200-400)
- Max 50 lines per method
- One class per file (except nested)
- Group by feature, not by type
``` ## Code Smells
src/ocr/paddle_ocr.py # snake_case for modules
src/inference/yolo_detector.py # snake_case for modules ```csharp
tests/test_paddle_ocr.py # test_ prefix for tests // BAD: Deep nesting
config.py # snake_case for config if (doc != null)
if (doc.IsValid)
if (doc.HasFields)
// ...
// GOOD: Early returns
if (doc is null) return null;
if (!doc.IsValid) return null;
if (!doc.HasFields) return null;
// ...
// BAD: Magic numbers
if (confidence > 0.5) { }
// GOOD: Named constants
private const double ConfidenceThreshold = 0.5;
if (confidence > ConfidenceThreshold) { }
``` ```
### Module Size Guidelines ## Logging
- **Maximum**: 800 lines per file ```csharp
- **Typical**: 200-400 lines per file // Structured logging with templates
- **Functions**: Max 50 lines each _logger.LogInformation("Processing document {DocumentId}", docId);
- Extract utilities when modules grow too large _logger.LogError(ex, "Failed to process {DocumentId}", docId);
## Comments & Documentation // Appropriate levels
LogDebug // Development details
### When to Comment LogInformation // Normal operations
LogWarning // Potential issues
```python LogError // Errors with exceptions
# GOOD: Explain WHY, not WHAT
# Swedish Bankgiro uses Luhn algorithm with weight [1,2,1,2...]
def validate_bankgiro_checksum(bankgiro: str) -> bool:
...
# Payment line format: 7 groups separated by #, checksum at end
def parse_payment_line(line: str) -> PaymentLineData:
...
# BAD: Stating the obvious
# Increment counter by 1
count += 1
# Set name to user's name
name = user.name
``` ```
### Docstrings for Public APIs ## Testing (AAA Pattern)
```python ```csharp
def extract_invoice_fields( [Fact]
pdf_path: Path, public async Task GetById_WithValidId_ReturnsDocument()
confidence_threshold: float = 0.5, {
use_gpu: bool = True // Arrange
) -> InferenceResult: var repo = Substitute.For<IRepository<Document>>();
"""Extract structured fields from Swedish invoice PDF. repo.GetByIdAsync(Arg.Any<Guid>(), Arg.Any<CancellationToken>())
.Returns(new Document("Test"));
var service = new DocumentService(repo);
Uses YOLOv11 for field detection and PaddleOCR for text extraction. // Act
Applies field-specific normalization and validation. var result = await service.GetAsync(Guid.NewGuid(), CancellationToken.None);
Args: // Assert
pdf_path: Path to the invoice PDF file. result.Should().NotBeNull();
confidence_threshold: Minimum confidence for field detection (0.0-1.0). result!.Name.Should().Be("Test");
use_gpu: Whether to use GPU acceleration.
Returns:
InferenceResult containing extracted fields and confidence scores.
Raises:
FileNotFoundError: If PDF file doesn't exist.
ProcessingError: If OCR or detection fails.
Example:
>>> result = extract_invoice_fields(Path("invoice.pdf"))
>>> print(result.fields["invoice_number"])
"INV-2024-001"
"""
...
```
## Performance Best Practices
### Caching
```python
from functools import lru_cache
from cachetools import TTLCache
# Static data: LRU cache
@lru_cache(maxsize=100)
def get_field_config(field_name: str) -> FieldConfig:
"""Load field configuration (cached)."""
return load_config(field_name)
# Dynamic data: TTL cache
_document_cache = TTLCache(maxsize=1000, ttl=300) # 5 minutes
def get_document_cached(doc_id: str) -> Document | None:
if doc_id in _document_cache:
return _document_cache[doc_id]
doc = repo.find_by_id(doc_id)
if doc:
_document_cache[doc_id] = doc
return doc
```
### Database Queries
```python
# GOOD: Select only needed columns
cur.execute("""
SELECT id, status, fields->>'invoice_number'
FROM documents
WHERE status = %s
LIMIT %s
""", ('processed', 10))
# BAD: Select everything
cur.execute("SELECT * FROM documents")
# GOOD: Batch operations
cur.executemany(
"INSERT INTO labels (doc_id, field, value) VALUES (%s, %s, %s)",
[(doc_id, f, v) for f, v in fields.items()]
)
# BAD: Individual inserts in loop
for field, value in fields.items():
cur.execute("INSERT INTO labels ...", (doc_id, field, value))
```
### Lazy Loading
```python
class InferencePipeline:
def __init__(self, model_path: Path):
self.model_path = model_path
self._model: YOLO | None = None
self._ocr: PaddleOCR | None = None
@property
def model(self) -> YOLO:
"""Lazy load YOLO model."""
if self._model is None:
self._model = YOLO(str(self.model_path))
return self._model
@property
def ocr(self) -> PaddleOCR:
"""Lazy load PaddleOCR."""
if self._ocr is None:
self._ocr = PaddleOCR(use_angle_cls=True, lang="latin")
return self._ocr
```
## Testing Standards
### Test Structure (AAA Pattern)
```python
def test_extract_bankgiro_valid():
# Arrange
text = "Bankgiro: 123-4567"
# Act
result = extract_bankgiro(text)
# Assert
assert result == "1234567"
def test_extract_bankgiro_invalid_returns_none():
# Arrange
text = "No bankgiro here"
# Act
result = extract_bankgiro(text)
# Assert
assert result is None
```
### Test Naming
```python
# GOOD: Descriptive test names
def test_parse_payment_line_extracts_all_fields(): ...
def test_parse_payment_line_handles_missing_checksum(): ...
def test_validate_ocr_returns_false_for_invalid_checksum(): ...
# BAD: Vague test names
def test_parse(): ...
def test_works(): ...
def test_payment_line(): ...
```
### Fixtures
```python
import pytest
from pathlib import Path
@pytest.fixture
def sample_invoice_pdf(tmp_path: Path) -> Path:
"""Create sample invoice PDF for testing."""
pdf_path = tmp_path / "invoice.pdf"
# Create test PDF...
return pdf_path
@pytest.fixture
def inference_pipeline(sample_model_path: Path) -> InferencePipeline:
"""Create inference pipeline with test model."""
return InferencePipeline(sample_model_path)
def test_process_invoice(inference_pipeline, sample_invoice_pdf):
result = inference_pipeline.process(sample_invoice_pdf)
assert result.fields.get("invoice_number") is not None
```
## Code Smell Detection
### 1. Long Functions
```python
# BAD: Function > 50 lines
def process_document():
# 100 lines of code...
# GOOD: Split into smaller functions
def process_document(pdf_path: Path) -> InferenceResult:
image = render_pdf(pdf_path)
detections = detect_fields(image)
ocr_results = extract_text(image, detections)
fields = normalize_fields(ocr_results)
return build_result(fields)
```
### 2. Deep Nesting
```python
# BAD: 5+ levels of nesting
if document:
if document.is_valid:
if document.has_fields:
if field in document.fields:
if document.fields[field]:
# Do something
# GOOD: Early returns
if not document:
return None
if not document.is_valid:
return None
if not document.has_fields:
return None
if field not in document.fields:
return None
if not document.fields[field]:
return None
# Do something
```
### 3. Magic Numbers
```python
# BAD: Unexplained numbers
if confidence > 0.5:
...
time.sleep(3)
# GOOD: Named constants
CONFIDENCE_THRESHOLD = 0.5
RETRY_DELAY_SECONDS = 3
if confidence > CONFIDENCE_THRESHOLD:
...
time.sleep(RETRY_DELAY_SECONDS)
```
### 4. Mutable Default Arguments
```python
# BAD: Mutable default argument
def process_fields(fields: list = []): # DANGEROUS!
fields.append("new_field")
return fields
# GOOD: Use None as default
def process_fields(fields: list | None = None) -> list:
if fields is None:
fields = []
return [*fields, "new_field"]
```
## Logging Standards
```python
import logging
# Module-level logger
logger = logging.getLogger(__name__)
# GOOD: Appropriate log levels
logger.debug("Processing document: %s", doc_id)
logger.info("Document processed successfully: %s", doc_id)
logger.warning("Low confidence score: %.2f", confidence)
logger.error("Failed to process document: %s", error)
# GOOD: Structured logging with extra data
logger.info(
"Inference complete",
extra={
"document_id": doc_id,
"field_count": len(fields),
"processing_time_ms": elapsed_ms
} }
)
# BAD: Using print()
print(f"Processing {doc_id}") # Never in production!
``` ```
**Remember**: Code quality is not negotiable. Clear, maintainable Python code with proper type hints enables confident development and refactoring. ## Key Rules
- Always use `CancellationToken` for async methods
- Prefer `records` for DTOs and immutable data
- Use `IReadOnlyList<T>` for return types
- Never use `async void` (except event handlers)
- Always handle `null` with pattern matching or null operators
- Use structured logging, never `Console.WriteLine`

View File

@@ -1,314 +1,274 @@
# Backend Development Patterns # .NET Development Best Practices
Backend architecture patterns for Python/FastAPI/PostgreSQL applications. ## Project Structure
## API Design
### RESTful Structure
``` ```
GET /api/v1/documents # List src/
GET /api/v1/documents/{id} # Get Domain/ # Entities, value objects, domain events
POST /api/v1/documents # Create Application/ # Use cases, DTOs, interfaces
PUT /api/v1/documents/{id} # Replace Infrastructure/ # EF Core, external services
PATCH /api/v1/documents/{id} # Update Api/ # Controllers, middleware, filters
DELETE /api/v1/documents/{id} # Delete tests/
Unit/
GET /api/v1/documents?status=processed&sort=created_at&limit=20&offset=0 Integration/
``` ```
### FastAPI Route Pattern ## Code Style
```python ```csharp
from fastapi import APIRouter, HTTPException, Depends, Query, File, UploadFile // Use records for DTOs and value objects
from pydantic import BaseModel public sealed record CreateDocumentRequest(string Name, string Type);
router = APIRouter(prefix="/api/v1", tags=["inference"]) // Use primary constructors
public class DocumentService(IRepository<Document> repo, ILogger<DocumentService> logger)
{
public async Task<Document?> GetAsync(Guid id, CancellationToken ct) =>
await repo.GetByIdAsync(id, ct);
}
@router.post("/infer", response_model=ApiResponse[InferenceResult]) // Prefer expression body for simple methods
async def infer_document( public Document? FindById(Guid id) => _documents.FirstOrDefault(d => d.Id == id);
file: UploadFile = File(...),
confidence_threshold: float = Query(0.5, ge=0, le=1), // Use collection expressions
service: InferenceService = Depends(get_inference_service) int[] numbers = [1, 2, 3];
) -> ApiResponse[InferenceResult]: List<string> names = ["Alice", "Bob"];
result = await service.process(file, confidence_threshold)
return ApiResponse(success=True, data=result)
``` ```
### Consistent Response Schema ## Async/Await
```python ```csharp
from typing import Generic, TypeVar // Always pass CancellationToken
T = TypeVar('T') public async Task<Document> GetAsync(Guid id, CancellationToken ct)
class ApiResponse(BaseModel, Generic[T]): // Use ConfigureAwait(false) in libraries
success: bool await _httpClient.GetAsync(url, ct).ConfigureAwait(false);
data: T | None = None
error: str | None = None // Avoid async void (except event handlers)
meta: dict | None = None public async Task ProcessAsync() { } // Good
public async void Process() { } // Bad
// Use ValueTask for hot paths with frequent sync completion
public ValueTask<int> GetCachedCountAsync()
``` ```
## Core Patterns ## Dependency Injection
### Repository Pattern ```csharp
// Register by interface
builder.Services.AddScoped<IDocumentService, DocumentService>();
```python // Use Options pattern for configuration
from typing import Protocol builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("App"));
class DocumentRepository(Protocol): public class MyService(IOptions<AppSettings> options)
def find_all(self, filters: dict | None = None) -> list[Document]: ... {
def find_by_id(self, id: str) -> Document | None: ... private readonly AppSettings _settings = options.Value;
def create(self, data: dict) -> Document: ... }
def update(self, id: str, data: dict) -> Document: ...
def delete(self, id: str) -> None: ... // Avoid service locator pattern
// Bad: var service = serviceProvider.GetService<IMyService>();
// Good: Constructor injection
``` ```
### Service Layer ## Entity Framework Core
```python ```csharp
class InferenceService: // Always use AsNoTracking for read-only queries
def __init__(self, model_path: str, use_gpu: bool = True): await _context.Documents.AsNoTracking().ToListAsync(ct);
self.pipeline = InferencePipeline(model_path=model_path, use_gpu=use_gpu)
async def process(self, file: UploadFile, confidence_threshold: float) -> InferenceResult: // Use projection to select only needed fields
temp_path = self._save_temp_file(file) await _context.Documents
try: .Where(d => d.Status == "Active")
return self.pipeline.process_pdf(temp_path) .Select(d => new DocumentDto(d.Id, d.Name))
finally: .ToListAsync(ct);
temp_path.unlink(missing_ok=True)
```
### Dependency Injection // Prevent N+1 with Include or projection
await _context.Documents.Include(d => d.Labels).ToListAsync(ct);
```python // Use explicit transactions for multiple operations
from functools import lru_cache await using var tx = await _context.Database.BeginTransactionAsync(ct);
from pydantic_settings import BaseSettings
class Settings(BaseSettings): // Configure entities with IEntityTypeConfiguration
db_host: str = "localhost" public class DocumentConfiguration : IEntityTypeConfiguration<Document>
db_password: str {
model_path: str = "runs/train/invoice_fields/weights/best.pt" public void Configure(EntityTypeBuilder<Document> builder)
class Config: {
env_file = ".env" builder.HasKey(d => d.Id);
builder.Property(d => d.Name).HasMaxLength(200).IsRequired();
@lru_cache() builder.HasIndex(d => d.Status);
def get_settings() -> Settings: }
return Settings() }
def get_inference_service(settings: Settings = Depends(get_settings)) -> InferenceService:
return InferenceService(model_path=settings.model_path)
```
## Database Patterns
### Connection Pooling
```python
from psycopg2 import pool
from contextlib import contextmanager
db_pool = pool.ThreadedConnectionPool(minconn=2, maxconn=10, **db_config)
@contextmanager
def get_db_connection():
conn = db_pool.getconn()
try:
yield conn
finally:
db_pool.putconn(conn)
```
### Query Optimization
```python
# GOOD: Select only needed columns
cur.execute("""
SELECT id, status, fields->>'InvoiceNumber' as invoice_number
FROM documents WHERE status = %s
ORDER BY created_at DESC LIMIT %s
""", ('processed', 10))
# BAD: SELECT * FROM documents
```
### N+1 Prevention
```python
# BAD: N+1 queries
for doc in documents:
doc.labels = get_labels(doc.id) # N queries
# GOOD: Batch fetch with JOIN
cur.execute("""
SELECT d.id, d.status, array_agg(l.label) as labels
FROM documents d
LEFT JOIN document_labels l ON d.id = l.document_id
GROUP BY d.id, d.status
""")
```
### Transaction Pattern
```python
def create_document_with_labels(doc_data: dict, labels: list[dict]) -> str:
with get_db_connection() as conn:
try:
with conn.cursor() as cur:
cur.execute("INSERT INTO documents ... RETURNING id", ...)
doc_id = cur.fetchone()[0]
for label in labels:
cur.execute("INSERT INTO document_labels ...", ...)
conn.commit()
return doc_id
except Exception:
conn.rollback()
raise
```
## Caching
```python
from cachetools import TTLCache
_cache = TTLCache(maxsize=1000, ttl=300)
def get_document_cached(doc_id: str) -> Document | None:
if doc_id in _cache:
return _cache[doc_id]
doc = repo.find_by_id(doc_id)
if doc:
_cache[doc_id] = doc
return doc
``` ```
## Error Handling ## Error Handling
### Exception Hierarchy ```csharp
// Create domain-specific exceptions
public class NotFoundException(string resource, Guid id)
: Exception($"{resource} not found: {id}");
```python // Use global exception handler
class AppError(Exception): public class GlobalExceptionHandler(ILogger<GlobalExceptionHandler> logger) : IExceptionHandler
def __init__(self, message: str, status_code: int = 500): {
self.message = message public async ValueTask<bool> TryHandleAsync(HttpContext ctx, Exception ex, CancellationToken ct)
self.status_code = status_code {
logger.LogError(ex, "Error: {Message}", ex.Message);
ctx.Response.StatusCode = ex is NotFoundException ? 404 : 500;
await ctx.Response.WriteAsJsonAsync(new { error = ex.Message }, ct);
return true;
}
}
class NotFoundError(AppError): // Use Result pattern for expected failures
def __init__(self, resource: str, id: str): public Result<Document> Validate(CreateRequest request) =>
super().__init__(f"{resource} not found: {id}", 404) string.IsNullOrEmpty(request.Name)
? Result<Document>.Fail("Name is required")
class ValidationError(AppError): : Result<Document>.Ok(new Document(request.Name));
def __init__(self, message: str):
super().__init__(message, 400)
``` ```
### FastAPI Exception Handler ## Validation
```python ```csharp
@app.exception_handler(AppError) // Use FluentValidation
async def app_error_handler(request: Request, exc: AppError): public class CreateDocumentValidator : AbstractValidator<CreateDocumentRequest>
return JSONResponse(status_code=exc.status_code, content={"success": False, "error": exc.message}) {
public CreateDocumentValidator()
{
RuleFor(x => x.Name).NotEmpty().MaximumLength(200);
RuleFor(x => x.Type).Must(BeValidType).WithMessage("Invalid document type");
}
}
@app.exception_handler(Exception) // Or use Data Annotations for simple cases
async def generic_error_handler(request: Request, exc: Exception): public record CreateRequest(
logger.error(f"Unexpected error: {exc}", exc_info=True) [Required, MaxLength(200)] string Name,
return JSONResponse(status_code=500, content={"success": False, "error": "Internal server error"}) [Range(1, 100)] int Quantity);
``` ```
### Retry with Backoff ## Logging
```python ```csharp
async def retry_with_backoff(fn, max_retries: int = 3, base_delay: float = 1.0): // Use structured logging with templates
last_error = None logger.LogInformation("Processing document {DocumentId} for user {UserId}", docId, userId);
for attempt in range(max_retries):
try: // Use appropriate log levels
return await fn() if asyncio.iscoroutinefunction(fn) else fn() logger.LogDebug("Cache hit for key {Key}", key); // Development details
except Exception as e: logger.LogInformation("Document {Id} created", id); // Normal operations
last_error = e logger.LogWarning("Retry attempt {Attempt} for {Op}", n, op); // Potential issues
if attempt < max_retries - 1: logger.LogError(ex, "Failed to process {DocumentId}", id); // Errors
await asyncio.sleep(base_delay * (2 ** attempt))
raise last_error // Configure log filtering in appsettings
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Microsoft.EntityFrameworkCore": "Warning"
}
}
}
``` ```
## Rate Limiting ## API Design
```python ```csharp
from time import time [ApiController]
from collections import defaultdict [Route("api/v1/[controller]")]
public class DocumentsController(IDocumentService service) : ControllerBase
{
[HttpGet("{id:guid}")]
[ProducesResponseType<Document>(200)]
[ProducesResponseType(404)]
public async Task<IActionResult> Get(Guid id, CancellationToken ct)
{
var doc = await service.GetAsync(id, ct);
return doc is null ? NotFound() : Ok(doc);
}
class RateLimiter: [HttpPost]
def __init__(self): public async Task<IActionResult> Create(CreateRequest request, CancellationToken ct)
self.requests: dict[str, list[float]] = defaultdict(list) {
var doc = await service.CreateAsync(request, ct);
def check_limit(self, identifier: str, max_requests: int, window_sec: int) -> bool: return CreatedAtAction(nameof(Get), new { id = doc.Id }, doc);
now = time() }
self.requests[identifier] = [t for t in self.requests[identifier] if now - t < window_sec] }
if len(self.requests[identifier]) >= max_requests:
return False
self.requests[identifier].append(now)
return True
limiter = RateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
ip = request.client.host
if not limiter.check_limit(ip, max_requests=100, window_sec=60):
return JSONResponse(status_code=429, content={"error": "Rate limit exceeded"})
return await call_next(request)
``` ```
## Logging & Middleware ## Testing
### Request Logging ```csharp
// Use descriptive test names
[Fact]
public async Task GetById_WithValidId_ReturnsDocument()
{
// Arrange
var repo = Substitute.For<IRepository<Document>>();
repo.GetByIdAsync(Arg.Any<Guid>(), Arg.Any<CancellationToken>())
.Returns(new Document("Test"));
var service = new DocumentService(repo);
```python // Act
@app.middleware("http") var result = await service.GetAsync(Guid.NewGuid(), CancellationToken.None);
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())[:8] // Assert
start_time = time.time() result.Should().NotBeNull();
logger.info(f"[{request_id}] {request.method} {request.url.path}") result!.Name.Should().Be("Test");
response = await call_next(request) }
duration_ms = (time.time() - start_time) * 1000
logger.info(f"[{request_id}] Completed {response.status_code} in {duration_ms:.2f}ms") // Use WebApplicationFactory for integration tests
return response public class ApiTests(WebApplicationFactory<Program> factory) : IClassFixture<WebApplicationFactory<Program>>
{
[Fact]
public async Task GetDocuments_ReturnsSuccess()
{
var client = factory.CreateClient();
var response = await client.GetAsync("/api/v1/documents");
response.StatusCode.Should().Be(HttpStatusCode.OK);
}
}
``` ```
### Structured Logging ## Performance
```python ```csharp
class JSONFormatter(logging.Formatter): // Use IMemoryCache for frequently accessed data
def format(self, record): public class CachedService(IMemoryCache cache, IRepository<Document> repo)
return json.dumps({ {
"timestamp": datetime.utcnow().isoformat(), public async Task<Document?> GetAsync(Guid id, CancellationToken ct) =>
"level": record.levelname, await cache.GetOrCreateAsync($"doc:{id}", async entry =>
"message": record.getMessage(), {
"module": record.module, entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(5);
}) return await repo.GetByIdAsync(id, ct);
});
}
// Use pagination for large collections
public async Task<PagedResult<Document>> GetPagedAsync(int page, int size, CancellationToken ct) =>
new(
await _context.Documents.Skip((page - 1) * size).Take(size).ToListAsync(ct),
await _context.Documents.CountAsync(ct)
);
// Use IAsyncEnumerable for streaming large datasets
public async IAsyncEnumerable<Document> StreamAllAsync([EnumeratorCancellation] CancellationToken ct)
{
await foreach (var doc in _context.Documents.AsAsyncEnumerable().WithCancellation(ct))
yield return doc;
}
``` ```
## Background Tasks ## Security
```python ```csharp
from fastapi import BackgroundTasks // Never hardcode secrets
var apiKey = builder.Configuration["ApiKey"]; // From environment/secrets
def send_notification(document_id: str, status: str): // Use parameterized queries (EF Core does this automatically)
logger.info(f"Notification: {document_id} -> {status}") // Bad: $"SELECT * FROM Users WHERE Id = {id}"
// Good: _context.Users.Where(u => u.Id == id)
@router.post("/infer") // Validate and sanitize all inputs
async def infer(file: UploadFile, background_tasks: BackgroundTasks): // Use HTTPS in production
result = await process_document(file) // Implement rate limiting
background_tasks.add_task(send_notification, result.document_id, "completed") builder.Services.AddRateLimiter(options => { ... });
return result
``` ```
## Key Principles
- Repository pattern: Abstract data access
- Service layer: Business logic separated from routes
- Dependency injection via `Depends()`
- Connection pooling for database
- Parameterized queries only (no f-strings in SQL)
- Batch fetch to prevent N+1
- Consistent `ApiResponse[T]` format
- Exception hierarchy with proper status codes
- Rate limit by IP
- Structured logging with request ID

View File

@@ -1,665 +1,234 @@
--- ---
name: coding-standards name: coding-standards
description: Universal coding standards, best practices, and patterns for Python, FastAPI, and data processing development. description: .NET/C# coding standards and best practices.
--- ---
# Coding Standards & Best Practices # .NET Coding Standards
Python coding standards for the Invoice Master project. ## Core Principles
## Code Quality Principles - **Readability First** - Clear names, self-documenting code
- **KISS** - Simplest solution that works
- **DRY** - Extract common logic, avoid copy-paste
- **YAGNI** - Don't build features before needed
### 1. Readability First ## Naming Conventions
- Code is read more than written
- Clear variable and function names
- Self-documenting code preferred over comments
- Consistent formatting (follow PEP 8)
### 2. KISS (Keep It Simple, Stupid) ```csharp
- Simplest solution that works // PascalCase: Types, methods, properties, public fields
- Avoid over-engineering public class DocumentService { }
- No premature optimization public async Task<Document> GetByIdAsync(Guid id) { }
- Easy to understand > clever code public string InvoiceNumber { get; init; }
### 3. DRY (Don't Repeat Yourself) // camelCase: Parameters, local variables, private fields with underscore
- Extract common logic into functions private readonly ILogger<DocumentService> _logger;
- Create reusable utilities public void Process(string documentId, int pageCount) { }
- Share modules across the codebase
- Avoid copy-paste programming
### 4. YAGNI (You Aren't Gonna Need It) // Interfaces: I prefix
- Don't build features before they're needed public interface IDocumentRepository { }
- Avoid speculative generality
- Add complexity only when required
- Start simple, refactor when needed
## Python Standards // Async methods: Async suffix
public async Task<Document> LoadAsync(CancellationToken ct)
### Variable Naming
```python
# GOOD: Descriptive names
invoice_number = "INV-2024-001"
is_valid_document = True
total_confidence_score = 0.95
# BAD: Unclear names
inv = "INV-2024-001"
flag = True
x = 0.95
``` ```
### Function Naming ## Modern C# Features
```python ```csharp
# GOOD: Verb-noun pattern with type hints // Records for DTOs and value objects
def extract_invoice_fields(pdf_path: Path) -> dict[str, str]: public sealed record CreateDocumentRequest(string Name, string Type);
"""Extract fields from invoice PDF.""" public sealed record DocumentDto(Guid Id, string Name, DateTime CreatedAt);
...
def calculate_confidence(predictions: list[float]) -> float: // Primary constructors
"""Calculate average confidence score.""" public class DocumentService(IRepository<Document> repo, ILogger<DocumentService> logger)
... {
public async Task<Document?> GetAsync(Guid id, CancellationToken ct) =>
await repo.GetByIdAsync(id, ct);
}
def is_valid_bankgiro(value: str) -> bool: // Pattern matching
"""Check if value is valid Bankgiro number.""" var message = result switch
... {
{ IsSuccess: true, Value: var doc } => $"Found: {doc.Name}",
{ Error: var err } => $"Error: {err}",
_ => "Unknown"
};
# BAD: Unclear or noun-only // Collection expressions
def invoice(path): int[] numbers = [1, 2, 3];
... List<string> names = ["Alice", "Bob"];
def confidence(p): // Null coalescing
... var name = user?.Name ?? "Unknown";
list ??= [];
def bankgiro(v):
...
``` ```
### Type Hints (REQUIRED) ## Immutability (Critical)
```python ```csharp
# GOOD: Full type annotations // GOOD: Create new objects
from typing import Optional public record User(string Name, int Age)
from pathlib import Path {
from dataclasses import dataclass public User WithName(string newName) => this with { Name = newName };
}
@dataclass // GOOD: Immutable collections
class InferenceResult: public IReadOnlyList<string> GetNames() => _names.AsReadOnly();
document_id: str
fields: dict[str, str]
confidence: dict[str, float]
processing_time_ms: float
def process_document( // BAD: Mutation
pdf_path: Path, public void UpdateUser(User user, string name)
confidence_threshold: float = 0.5 {
) -> InferenceResult: user.Name = name; // MUTATION!
"""Process PDF and return extracted fields.""" }
...
# BAD: No type hints
def process_document(pdf_path, confidence_threshold=0.5):
...
``` ```
### Immutability Pattern (CRITICAL) ## Error Handling
```python ```csharp
# GOOD: Create new objects, don't mutate // Domain exceptions
def update_fields(fields: dict[str, str], updates: dict[str, str]) -> dict[str, str]: public class NotFoundException(string resource, Guid id)
return {**fields, **updates} : Exception($"{resource} not found: {id}");
def add_item(items: list[str], new_item: str) -> list[str]: // Comprehensive handling
return [*items, new_item] public async Task<Document> LoadAsync(Guid id, CancellationToken ct)
{
try
{
var doc = await _repo.GetByIdAsync(id, ct);
return doc ?? throw new NotFoundException("Document", id);
}
catch (Exception ex) when (ex is not NotFoundException)
{
_logger.LogError(ex, "Failed to load document {Id}", id);
throw;
}
}
# BAD: Direct mutation // Result pattern for expected failures
def update_fields(fields: dict[str, str], updates: dict[str, str]) -> dict[str, str]: public Result<Document> Validate(CreateRequest request) =>
fields.update(updates) # MUTATION! string.IsNullOrEmpty(request.Name)
return fields ? Result<Document>.Fail("Name required")
: Result<Document>.Ok(new Document(request.Name));
def add_item(items: list[str], new_item: str) -> list[str]:
items.append(new_item) # MUTATION!
return items
``` ```
### Error Handling ## Async/Await
```python ```csharp
import logging // Always pass CancellationToken
public async Task<Document> GetAsync(Guid id, CancellationToken ct)
logger = logging.getLogger(__name__) // Use ConfigureAwait(false) in libraries
await _client.GetAsync(url, ct).ConfigureAwait(false);
# GOOD: Comprehensive error handling with logging // Avoid async void
def load_model(model_path: Path) -> Model: public async Task ProcessAsync() { } // Good
"""Load YOLO model from path.""" public async void Process() { } // Bad
try:
if not model_path.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
model = YOLO(str(model_path)) // Parallel when independent
logger.info(f"Model loaded: {model_path}") var tasks = ids.Select(id => GetAsync(id, ct));
return model var results = await Task.WhenAll(tasks);
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise RuntimeError(f"Model loading failed: {model_path}") from e
# BAD: No error handling
def load_model(model_path):
return YOLO(str(model_path))
# BAD: Bare except
def load_model(model_path):
try:
return YOLO(str(model_path))
except: # Never use bare except!
return None
``` ```
### Async Best Practices ## LINQ Best Practices
```python ```csharp
import asyncio // Prefer method syntax for complex queries
var result = documents
.Where(d => d.Status == "Active")
.OrderByDescending(d => d.CreatedAt)
.Select(d => new DocumentDto(d.Id, d.Name, d.CreatedAt))
.Take(10);
# GOOD: Parallel execution when possible // Use Any() instead of Count() > 0
async def process_batch(pdf_paths: list[Path]) -> list[InferenceResult]: if (documents.Any(d => d.IsValid)) { }
tasks = [process_document(path) for path in pdf_paths]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions // Avoid multiple enumerations
valid_results = [] var list = documents.ToList(); // Materialize once
for path, result in zip(pdf_paths, results): var count = list.Count;
if isinstance(result, Exception): var first = list.FirstOrDefault();
logger.error(f"Failed to process {path}: {result}")
else:
valid_results.append(result)
return valid_results
# BAD: Sequential when unnecessary
async def process_batch(pdf_paths: list[Path]) -> list[InferenceResult]:
results = []
for path in pdf_paths:
result = await process_document(path)
results.append(result)
return results
```
### Context Managers
```python
from contextlib import contextmanager
from pathlib import Path
import tempfile
# GOOD: Proper resource management
@contextmanager
def temp_pdf_copy(pdf_path: Path):
"""Create temporary copy of PDF for processing."""
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp.write(pdf_path.read_bytes())
tmp_path = Path(tmp.name)
try:
yield tmp_path
finally:
tmp_path.unlink(missing_ok=True)
# Usage
with temp_pdf_copy(original_pdf) as tmp_pdf:
result = process_pdf(tmp_pdf)
```
## FastAPI Best Practices
### Route Structure
```python
from fastapi import APIRouter, HTTPException, Depends, Query, File, UploadFile
from pydantic import BaseModel
router = APIRouter(prefix="/api/v1", tags=["inference"])
class InferenceResponse(BaseModel):
success: bool
document_id: str
fields: dict[str, str]
confidence: dict[str, float]
processing_time_ms: float
@router.post("/infer", response_model=InferenceResponse)
async def infer_document(
file: UploadFile = File(...),
confidence_threshold: float = Query(0.5, ge=0.0, le=1.0)
) -> InferenceResponse:
"""Process invoice PDF and extract fields."""
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files accepted")
result = await inference_service.process(file, confidence_threshold)
return InferenceResponse(
success=True,
document_id=result.document_id,
fields=result.fields,
confidence=result.confidence,
processing_time_ms=result.processing_time_ms
)
```
### Input Validation with Pydantic
```python
from pydantic import BaseModel, Field, field_validator
from datetime import date
import re
class InvoiceData(BaseModel):
invoice_number: str = Field(..., min_length=1, max_length=50)
invoice_date: date
amount: float = Field(..., gt=0)
bankgiro: str | None = None
ocr_number: str | None = None
@field_validator("bankgiro")
@classmethod
def validate_bankgiro(cls, v: str | None) -> str | None:
if v is None:
return None
# Bankgiro: 7-8 digits
cleaned = re.sub(r"[^0-9]", "", v)
if not (7 <= len(cleaned) <= 8):
raise ValueError("Bankgiro must be 7-8 digits")
return cleaned
@field_validator("ocr_number")
@classmethod
def validate_ocr(cls, v: str | None) -> str | None:
if v is None:
return None
# OCR: 2-25 digits
cleaned = re.sub(r"[^0-9]", "", v)
if not (2 <= len(cleaned) <= 25):
raise ValueError("OCR must be 2-25 digits")
return cleaned
```
### Response Format
```python
from pydantic import BaseModel
from typing import Generic, TypeVar
T = TypeVar("T")
class ApiResponse(BaseModel, Generic[T]):
success: bool
data: T | None = None
error: str | None = None
meta: dict | None = None
# Success response
return ApiResponse(
success=True,
data=result,
meta={"processing_time_ms": elapsed_ms}
)
# Error response
return ApiResponse(
success=False,
error="Invalid PDF format"
)
``` ```
## File Organization ## File Organization
### Project Structure
``` ```
src/ src/
├── cli/ # Command-line interfaces Domain/ # Entities, value objects
│ ├── autolabel.py Application/ # Use cases, DTOs, interfaces
├── train.py Infrastructure/ # EF Core, external services
│ └── infer.py Api/ # Controllers, middleware
├── pdf/ # PDF processing tests/
├── extractor.py Unit/
└── renderer.py Integration/
├── ocr/ # OCR processing
│ ├── paddle_ocr.py
│ └── machine_code_parser.py
├── inference/ # Inference pipeline
│ ├── pipeline.py
│ ├── yolo_detector.py
│ └── field_extractor.py
├── normalize/ # Field normalization
│ ├── base.py
│ ├── date_normalizer.py
│ └── amount_normalizer.py
├── web/ # FastAPI application
│ ├── app.py
│ ├── routes.py
│ ├── services.py
│ └── schemas.py
└── utils/ # Shared utilities
├── validators.py
├── text_cleaner.py
└── logging.py
tests/ # Mirror of src structure
├── test_pdf/
├── test_ocr/
└── test_inference/
``` ```
### File Naming **Guidelines:**
- Max 800 lines per file (typical 200-400)
- Max 50 lines per method
- One class per file (except nested)
- Group by feature, not by type
``` ## Code Smells
src/ocr/paddle_ocr.py # snake_case for modules
src/inference/yolo_detector.py # snake_case for modules ```csharp
tests/test_paddle_ocr.py # test_ prefix for tests // BAD: Deep nesting
config.py # snake_case for config if (doc != null)
if (doc.IsValid)
if (doc.HasFields)
// ...
// GOOD: Early returns
if (doc is null) return null;
if (!doc.IsValid) return null;
if (!doc.HasFields) return null;
// ...
// BAD: Magic numbers
if (confidence > 0.5) { }
// GOOD: Named constants
private const double ConfidenceThreshold = 0.5;
if (confidence > ConfidenceThreshold) { }
``` ```
### Module Size Guidelines ## Logging
- **Maximum**: 800 lines per file ```csharp
- **Typical**: 200-400 lines per file // Structured logging with templates
- **Functions**: Max 50 lines each _logger.LogInformation("Processing document {DocumentId}", docId);
- Extract utilities when modules grow too large _logger.LogError(ex, "Failed to process {DocumentId}", docId);
## Comments & Documentation // Appropriate levels
LogDebug // Development details
### When to Comment LogInformation // Normal operations
LogWarning // Potential issues
```python LogError // Errors with exceptions
# GOOD: Explain WHY, not WHAT
# Swedish Bankgiro uses Luhn algorithm with weight [1,2,1,2...]
def validate_bankgiro_checksum(bankgiro: str) -> bool:
...
# Payment line format: 7 groups separated by #, checksum at end
def parse_payment_line(line: str) -> PaymentLineData:
...
# BAD: Stating the obvious
# Increment counter by 1
count += 1
# Set name to user's name
name = user.name
``` ```
### Docstrings for Public APIs ## Testing (AAA Pattern)
```python ```csharp
def extract_invoice_fields( [Fact]
pdf_path: Path, public async Task GetById_WithValidId_ReturnsDocument()
confidence_threshold: float = 0.5, {
use_gpu: bool = True // Arrange
) -> InferenceResult: var repo = Substitute.For<IRepository<Document>>();
"""Extract structured fields from Swedish invoice PDF. repo.GetByIdAsync(Arg.Any<Guid>(), Arg.Any<CancellationToken>())
.Returns(new Document("Test"));
var service = new DocumentService(repo);
Uses YOLOv11 for field detection and PaddleOCR for text extraction. // Act
Applies field-specific normalization and validation. var result = await service.GetAsync(Guid.NewGuid(), CancellationToken.None);
Args: // Assert
pdf_path: Path to the invoice PDF file. result.Should().NotBeNull();
confidence_threshold: Minimum confidence for field detection (0.0-1.0). result!.Name.Should().Be("Test");
use_gpu: Whether to use GPU acceleration.
Returns:
InferenceResult containing extracted fields and confidence scores.
Raises:
FileNotFoundError: If PDF file doesn't exist.
ProcessingError: If OCR or detection fails.
Example:
>>> result = extract_invoice_fields(Path("invoice.pdf"))
>>> print(result.fields["invoice_number"])
"INV-2024-001"
"""
...
```
## Performance Best Practices
### Caching
```python
from functools import lru_cache
from cachetools import TTLCache
# Static data: LRU cache
@lru_cache(maxsize=100)
def get_field_config(field_name: str) -> FieldConfig:
"""Load field configuration (cached)."""
return load_config(field_name)
# Dynamic data: TTL cache
_document_cache = TTLCache(maxsize=1000, ttl=300) # 5 minutes
def get_document_cached(doc_id: str) -> Document | None:
if doc_id in _document_cache:
return _document_cache[doc_id]
doc = repo.find_by_id(doc_id)
if doc:
_document_cache[doc_id] = doc
return doc
```
### Database Queries
```python
# GOOD: Select only needed columns
cur.execute("""
SELECT id, status, fields->>'invoice_number'
FROM documents
WHERE status = %s
LIMIT %s
""", ('processed', 10))
# BAD: Select everything
cur.execute("SELECT * FROM documents")
# GOOD: Batch operations
cur.executemany(
"INSERT INTO labels (doc_id, field, value) VALUES (%s, %s, %s)",
[(doc_id, f, v) for f, v in fields.items()]
)
# BAD: Individual inserts in loop
for field, value in fields.items():
cur.execute("INSERT INTO labels ...", (doc_id, field, value))
```
### Lazy Loading
```python
class InferencePipeline:
def __init__(self, model_path: Path):
self.model_path = model_path
self._model: YOLO | None = None
self._ocr: PaddleOCR | None = None
@property
def model(self) -> YOLO:
"""Lazy load YOLO model."""
if self._model is None:
self._model = YOLO(str(self.model_path))
return self._model
@property
def ocr(self) -> PaddleOCR:
"""Lazy load PaddleOCR."""
if self._ocr is None:
self._ocr = PaddleOCR(use_angle_cls=True, lang="latin")
return self._ocr
```
## Testing Standards
### Test Structure (AAA Pattern)
```python
def test_extract_bankgiro_valid():
# Arrange
text = "Bankgiro: 123-4567"
# Act
result = extract_bankgiro(text)
# Assert
assert result == "1234567"
def test_extract_bankgiro_invalid_returns_none():
# Arrange
text = "No bankgiro here"
# Act
result = extract_bankgiro(text)
# Assert
assert result is None
```
### Test Naming
```python
# GOOD: Descriptive test names
def test_parse_payment_line_extracts_all_fields(): ...
def test_parse_payment_line_handles_missing_checksum(): ...
def test_validate_ocr_returns_false_for_invalid_checksum(): ...
# BAD: Vague test names
def test_parse(): ...
def test_works(): ...
def test_payment_line(): ...
```
### Fixtures
```python
import pytest
from pathlib import Path
@pytest.fixture
def sample_invoice_pdf(tmp_path: Path) -> Path:
"""Create sample invoice PDF for testing."""
pdf_path = tmp_path / "invoice.pdf"
# Create test PDF...
return pdf_path
@pytest.fixture
def inference_pipeline(sample_model_path: Path) -> InferencePipeline:
"""Create inference pipeline with test model."""
return InferencePipeline(sample_model_path)
def test_process_invoice(inference_pipeline, sample_invoice_pdf):
result = inference_pipeline.process(sample_invoice_pdf)
assert result.fields.get("invoice_number") is not None
```
## Code Smell Detection
### 1. Long Functions
```python
# BAD: Function > 50 lines
def process_document():
# 100 lines of code...
# GOOD: Split into smaller functions
def process_document(pdf_path: Path) -> InferenceResult:
image = render_pdf(pdf_path)
detections = detect_fields(image)
ocr_results = extract_text(image, detections)
fields = normalize_fields(ocr_results)
return build_result(fields)
```
### 2. Deep Nesting
```python
# BAD: 5+ levels of nesting
if document:
if document.is_valid:
if document.has_fields:
if field in document.fields:
if document.fields[field]:
# Do something
# GOOD: Early returns
if not document:
return None
if not document.is_valid:
return None
if not document.has_fields:
return None
if field not in document.fields:
return None
if not document.fields[field]:
return None
# Do something
```
### 3. Magic Numbers
```python
# BAD: Unexplained numbers
if confidence > 0.5:
...
time.sleep(3)
# GOOD: Named constants
CONFIDENCE_THRESHOLD = 0.5
RETRY_DELAY_SECONDS = 3
if confidence > CONFIDENCE_THRESHOLD:
...
time.sleep(RETRY_DELAY_SECONDS)
```
### 4. Mutable Default Arguments
```python
# BAD: Mutable default argument
def process_fields(fields: list = []): # DANGEROUS!
fields.append("new_field")
return fields
# GOOD: Use None as default
def process_fields(fields: list | None = None) -> list:
if fields is None:
fields = []
return [*fields, "new_field"]
```
## Logging Standards
```python
import logging
# Module-level logger
logger = logging.getLogger(__name__)
# GOOD: Appropriate log levels
logger.debug("Processing document: %s", doc_id)
logger.info("Document processed successfully: %s", doc_id)
logger.warning("Low confidence score: %.2f", confidence)
logger.error("Failed to process document: %s", error)
# GOOD: Structured logging with extra data
logger.info(
"Inference complete",
extra={
"document_id": doc_id,
"field_count": len(fields),
"processing_time_ms": elapsed_ms
} }
)
# BAD: Using print()
print(f"Processing {doc_id}") # Never in production!
``` ```
**Remember**: Code quality is not negotiable. Clear, maintainable Python code with proper type hints enables confident development and refactoring. ## Key Rules
- Always use `CancellationToken` for async methods
- Prefer `records` for DTOs and immutable data
- Use `IReadOnlyList<T>` for return types
- Never use `async void` (except event handlers)
- Always handle `null` with pattern matching or null operators
- Use structured logging, never `Console.WriteLine`