"""Migration script: JSON files -> PostgreSQL. Reads staging and archived release JSON files from a directory tree and inserts them into the staging_releases and archived_releases tables. All business logic is implemented as pure functions so it can be tested without a real database. The main() entry point wires together the pure functions with actual I/O. Usage: python scripts/migrate_json_to_db.py --source /path/to/releases \\ --dsn "postgresql://user:pass@localhost/db" [--dry-run] Pure functions (testable without DB): collect_json_files(directory) -> list[Path] is_staging_filename(name) -> bool is_archived_filename(name) -> bool parse_staging_json(data) -> MigrationRecord parse_archived_json(data) -> MigrationRecord build_staging_insert_sql(record) -> tuple[str, tuple] build_archived_insert_sql(record) -> tuple[str, tuple] """ from __future__ import annotations import argparse import json import re import sys from dataclasses import dataclass, field from datetime import date from pathlib import Path # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass(frozen=True) class MigrationRecord: """Parsed record ready for database insertion. released_at is None for staging records, set for archived records. """ repo: str version: str started_at: date tickets: list[dict] released_at: date | None = None # --------------------------------------------------------------------------- # File classification # --------------------------------------------------------------------------- # Archived filenames match: __.json # e.g. Billo.Platform.Payment_v1.0.1_2026-03-23.json _ARCHIVED_PATTERN = re.compile( r"^.+_v\d+\.\d+\.\d+_\d{4}-\d{2}-\d{2}\.json$" ) def is_staging_filename(name: str) -> bool: """Return True if the filename looks like a staging JSON file. Staging files end in .json and do not match the archived pattern. """ if not name.endswith(".json"): return False return not _ARCHIVED_PATTERN.match(name) def is_archived_filename(name: str) -> bool: """Return True if the filename looks like an archived release JSON file.""" return bool(_ARCHIVED_PATTERN.match(name)) # --------------------------------------------------------------------------- # File collection # --------------------------------------------------------------------------- def collect_json_files(directory: Path) -> list[Path]: """Recursively collect all .json files under directory. Returns a sorted list of Path objects. """ return sorted(directory.rglob("*.json")) # --------------------------------------------------------------------------- # Parsing pure functions # --------------------------------------------------------------------------- def parse_staging_json(data: dict) -> MigrationRecord: """Parse a staging release JSON dict into a MigrationRecord. Args: data: Parsed JSON dict with keys: version, repo, started_at, tickets. Returns: MigrationRecord with released_at=None. """ return MigrationRecord( repo=data["repo"], version=data["version"], started_at=date.fromisoformat(data["started_at"]), tickets=list(data.get("tickets") or []), released_at=None, ) def parse_archived_json(data: dict) -> MigrationRecord: """Parse an archived release JSON dict into a MigrationRecord. Args: data: Parsed JSON dict with keys: version, repo, started_at, tickets, released_at. Returns: MigrationRecord with released_at set. """ return MigrationRecord( repo=data["repo"], version=data["version"], started_at=date.fromisoformat(data["started_at"]), tickets=list(data.get("tickets") or []), released_at=date.fromisoformat(data["released_at"]), ) # --------------------------------------------------------------------------- # SQL builder pure functions # --------------------------------------------------------------------------- _STAGING_INSERT_SQL = """ INSERT INTO staging_releases (repo, version, started_at, tickets) VALUES (%s, %s, %s, %s) ON CONFLICT (repo) DO NOTHING """.strip() _ARCHIVED_INSERT_SQL = """ INSERT INTO archived_releases (repo, version, started_at, tickets, released_at) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (repo, version) DO NOTHING """.strip() def build_staging_insert_sql(record: MigrationRecord) -> tuple[str, tuple]: """Build the INSERT SQL and parameters for a staging release record. Returns: (sql_string, params_tuple) ready for cursor.execute(). """ tickets_json = json.dumps(record.tickets) params = ( record.repo, record.version, record.started_at.isoformat(), tickets_json, ) return _STAGING_INSERT_SQL, params def build_archived_insert_sql(record: MigrationRecord) -> tuple[str, tuple]: """Build the INSERT SQL and parameters for an archived release record. Returns: (sql_string, params_tuple) ready for cursor.execute(). """ tickets_json = json.dumps(record.tickets) params = ( record.repo, record.version, record.started_at.isoformat(), tickets_json, record.released_at.isoformat() if record.released_at else None, ) return _ARCHIVED_INSERT_SQL, params # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Migrate JSON release files to PostgreSQL" ) parser.add_argument( "--source", type=Path, required=True, help="Root directory containing release JSON files", ) parser.add_argument( "--dsn", type=str, default="", help="PostgreSQL DSN (e.g. postgresql://user:pass@localhost/db)", ) parser.add_argument( "--dry-run", action="store_true", help="Print SQL statements without executing them", ) return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: """Entry point for the migration script. Returns: 0 on success, 1 on error. """ args = _parse_args(argv) if not args.source.exists(): print(f"ERROR: source directory does not exist: {args.source}", file=sys.stderr) return 1 files = collect_json_files(args.source) print(f"Found {len(files)} JSON file(s) under {args.source}") statements: list[tuple[str, tuple]] = [] errors: list[str] = [] for path in files: try: data = json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as exc: errors.append(f"Failed to read {path}: {exc}") continue # Determine file type from filename name = path.name try: if is_archived_filename(name) or "released_at" in data: record = parse_archived_json(data) sql, params = build_archived_insert_sql(record) else: record = parse_staging_json(data) sql, params = build_staging_insert_sql(record) except (KeyError, ValueError) as exc: errors.append(f"Failed to parse {path}: {exc}") continue statements.append((sql, params)) if errors: for err in errors: print(f"WARNING: {err}", file=sys.stderr) if args.dry_run: print(f"\nDry run: {len(statements)} statement(s) would be executed:") for sql, params in statements: print(f" SQL: {sql!r}") print(f" Params: {params}") return 0 if not args.dsn: print("ERROR: --dsn is required when not using --dry-run", file=sys.stderr) return 1 try: import psycopg # noqa: PLC0415 except ImportError: print("ERROR: psycopg not installed. Run: pip install psycopg[binary]", file=sys.stderr) return 1 inserted = 0 with psycopg.connect(args.dsn) as conn: with conn.cursor() as cur: for sql, params in statements: cur.execute(sql, params) inserted += 1 conn.commit() print(f"Migration complete: {inserted} record(s) inserted.") return 0 if __name__ == "__main__": sys.exit(main())