Files
invoice-master-poc-v2/packages/training/training/cli/analyze_report.py
Yaojia Wang f1a7bfe6b7 WIP
2026-02-07 13:56:00 +01:00

442 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Analyze Auto-Label Report
Generates statistics and insights from database or autolabel_report.jsonl
"""
import argparse
import json
import logging
import sys
from collections import defaultdict
from pathlib import Path
from shared.config import get_db_connection_string
from shared.logging_config import setup_cli_logging
logger = logging.getLogger(__name__)
def load_reports_from_db() -> dict:
"""Load statistics directly from database using SQL aggregation."""
from shared.data.db import DocumentDB
db = DocumentDB()
db.connect()
stats = {
'total': 0,
'successful': 0,
'failed': 0,
'by_pdf_type': defaultdict(lambda: {'total': 0, 'successful': 0}),
'by_field': defaultdict(lambda: {
'total': 0,
'matched': 0,
'exact_match': 0,
'flexible_match': 0,
'scores': [],
'by_pdf_type': defaultdict(lambda: {'total': 0, 'matched': 0})
}),
'errors': defaultdict(int),
'processing_times': [],
}
conn = db.connect()
with conn.cursor() as cursor:
# Overall stats
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN success THEN 1 ELSE 0 END) as successful,
SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failed
FROM documents
""")
row = cursor.fetchone()
stats['total'] = row[0] or 0
stats['successful'] = row[1] or 0
stats['failed'] = row[2] or 0
# By PDF type
cursor.execute("""
SELECT
pdf_type,
COUNT(*) as total,
SUM(CASE WHEN success THEN 1 ELSE 0 END) as successful
FROM documents
GROUP BY pdf_type
""")
for row in cursor.fetchall():
pdf_type = row[0] or 'unknown'
stats['by_pdf_type'][pdf_type] = {
'total': row[1] or 0,
'successful': row[2] or 0
}
# Processing times
cursor.execute("""
SELECT AVG(processing_time_ms), MIN(processing_time_ms), MAX(processing_time_ms)
FROM documents
WHERE processing_time_ms > 0
""")
row = cursor.fetchone()
if row[0]:
stats['processing_time_stats'] = {
'avg_ms': float(row[0]),
'min_ms': float(row[1]),
'max_ms': float(row[2])
}
# Field stats
cursor.execute("""
SELECT
field_name,
COUNT(*) as total,
SUM(CASE WHEN matched THEN 1 ELSE 0 END) as matched,
SUM(CASE WHEN matched AND score >= 0.99 THEN 1 ELSE 0 END) as exact_match,
SUM(CASE WHEN matched AND score < 0.99 THEN 1 ELSE 0 END) as flexible_match,
AVG(CASE WHEN matched THEN score END) as avg_score
FROM field_results
GROUP BY field_name
ORDER BY field_name
""")
for row in cursor.fetchall():
field_name = row[0]
stats['by_field'][field_name] = {
'total': row[1] or 0,
'matched': row[2] or 0,
'exact_match': row[3] or 0,
'flexible_match': row[4] or 0,
'avg_score': float(row[5]) if row[5] else 0,
'scores': [], # Not loading individual scores for efficiency
'by_pdf_type': defaultdict(lambda: {'total': 0, 'matched': 0})
}
# Field stats by PDF type
cursor.execute("""
SELECT
fr.field_name,
d.pdf_type,
COUNT(*) as total,
SUM(CASE WHEN fr.matched THEN 1 ELSE 0 END) as matched
FROM field_results fr
JOIN documents d ON fr.document_id = d.document_id
GROUP BY fr.field_name, d.pdf_type
""")
for row in cursor.fetchall():
field_name = row[0]
pdf_type = row[1] or 'unknown'
if field_name in stats['by_field']:
stats['by_field'][field_name]['by_pdf_type'][pdf_type] = {
'total': row[2] or 0,
'matched': row[3] or 0
}
db.close()
return stats
def load_reports_from_file(report_path: str) -> list[dict]:
"""Load all reports from JSONL file(s). Supports glob patterns."""
path = Path(report_path)
# Handle glob pattern
if '*' in str(path) or '?' in str(path):
parent = path.parent
pattern = path.name
report_files = sorted(parent.glob(pattern))
else:
report_files = [path]
if not report_files:
return []
logger.info("Reading %d report file(s):", len(report_files))
for f in report_files:
logger.info(" - %s", f.name)
reports = []
for report_file in report_files:
if not report_file.exists():
continue
with open(report_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
reports.append(json.loads(line))
return reports
def analyze_reports(reports: list[dict]) -> dict:
"""Analyze reports and generate statistics."""
stats = {
'total': len(reports),
'successful': 0,
'failed': 0,
'by_pdf_type': defaultdict(lambda: {'total': 0, 'successful': 0}),
'by_field': defaultdict(lambda: {
'total': 0,
'matched': 0,
'exact_match': 0, # score == 1.0
'flexible_match': 0, # score < 1.0
'scores': [],
'by_pdf_type': defaultdict(lambda: {'total': 0, 'matched': 0})
}),
'errors': defaultdict(int),
'processing_times': [],
}
for report in reports:
pdf_type = report.get('pdf_type') or 'unknown'
success = report.get('success', False)
# Overall stats
if success:
stats['successful'] += 1
else:
stats['failed'] += 1
# By PDF type
stats['by_pdf_type'][pdf_type]['total'] += 1
if success:
stats['by_pdf_type'][pdf_type]['successful'] += 1
# Processing time
proc_time = report.get('processing_time_ms', 0)
if proc_time > 0:
stats['processing_times'].append(proc_time)
# Errors
for error in report.get('errors', []):
stats['errors'][error] += 1
# Field results
for field_result in report.get('field_results', []):
field_name = field_result['field_name']
matched = field_result.get('matched', False)
score = field_result.get('score', 0.0)
stats['by_field'][field_name]['total'] += 1
stats['by_field'][field_name]['by_pdf_type'][pdf_type]['total'] += 1
if matched:
stats['by_field'][field_name]['matched'] += 1
stats['by_field'][field_name]['scores'].append(score)
stats['by_field'][field_name]['by_pdf_type'][pdf_type]['matched'] += 1
if score >= 0.99:
stats['by_field'][field_name]['exact_match'] += 1
else:
stats['by_field'][field_name]['flexible_match'] += 1
return stats
def print_report(stats: dict, verbose: bool = False):
"""Print analysis report."""
logger.info("=" * 60)
logger.info("AUTO-LABEL REPORT ANALYSIS")
logger.info("=" * 60)
# Overall stats
logger.info("%s", "OVERALL STATISTICS".center(60))
logger.info("-" * 60)
total = stats['total']
successful = stats['successful']
failed = stats['failed']
success_rate = successful / total * 100 if total > 0 else 0
logger.info("Total documents: %8d", total)
logger.info("Successful: %8d (%.1f%%)", successful, success_rate)
logger.info("Failed: %8d (%.1f%%)", failed, 100-success_rate)
# Processing time
if 'processing_time_stats' in stats:
pts = stats['processing_time_stats']
logger.info("Processing time (ms):")
logger.info(" Average: %8.1f", pts['avg_ms'])
logger.info(" Min: %8.1f", pts['min_ms'])
logger.info(" Max: %8.1f", pts['max_ms'])
elif stats.get('processing_times'):
times = stats['processing_times']
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
logger.info("Processing time (ms):")
logger.info(" Average: %8.1f", avg_time)
logger.info(" Min: %8.1f", min_time)
logger.info(" Max: %8.1f", max_time)
# By PDF type
logger.info("%s", "BY PDF TYPE".center(60))
logger.info("-" * 60)
logger.info("%-15s %10s %10s %10s", 'Type', 'Total', 'Success', 'Rate')
logger.info("-" * 60)
for pdf_type, type_stats in sorted(stats['by_pdf_type'].items()):
type_total = type_stats['total']
type_success = type_stats['successful']
type_rate = type_success / type_total * 100 if type_total > 0 else 0
logger.info("%-15s %10d %10d %9.1f%%", pdf_type, type_total, type_success, type_rate)
# By field
logger.info("%s", "FIELD MATCH STATISTICS".center(60))
logger.info("-" * 60)
logger.info("%-18s %7s %7s %7s %7s %7s %8s", 'Field', 'Total', 'Match', 'Rate', 'Exact', 'Flex', 'AvgScore')
logger.info("-" * 60)
for field_name in ['InvoiceNumber', 'InvoiceDate', 'InvoiceDueDate', 'OCR', 'Bankgiro', 'Plusgiro', 'Amount']:
if field_name not in stats['by_field']:
continue
field_stats = stats['by_field'][field_name]
total = field_stats['total']
matched = field_stats['matched']
exact = field_stats['exact_match']
flex = field_stats['flexible_match']
rate = matched / total * 100 if total > 0 else 0
# Handle avg_score from either DB or file analysis
if 'avg_score' in field_stats:
avg_score = field_stats['avg_score']
elif field_stats['scores']:
avg_score = sum(field_stats['scores']) / len(field_stats['scores'])
else:
avg_score = 0
logger.info("%-18s %7d %7d %6.1f%% %7d %7d %8.3f", field_name, total, matched, rate, exact, flex, avg_score)
# Field match by PDF type
logger.info("%s", "FIELD MATCH BY PDF TYPE".center(60))
logger.info("-" * 60)
for pdf_type in sorted(stats['by_pdf_type'].keys()):
logger.info("[%s]", pdf_type.upper())
logger.info("%-18s %10s %10s %10s", 'Field', 'Total', 'Matched', 'Rate')
logger.info("-" * 50)
for field_name in ['InvoiceNumber', 'InvoiceDate', 'InvoiceDueDate', 'OCR', 'Bankgiro', 'Plusgiro', 'Amount']:
if field_name not in stats['by_field']:
continue
type_stats = stats['by_field'][field_name]['by_pdf_type'].get(pdf_type, {'total': 0, 'matched': 0})
total = type_stats['total']
matched = type_stats['matched']
rate = matched / total * 100 if total > 0 else 0
logger.info("%-18s %10d %10d %9.1f%%", field_name, total, matched, rate)
# Errors
if stats.get('errors') and verbose:
logger.info("%s", "ERRORS".center(60))
logger.info("-" * 60)
for error, count in sorted(stats['errors'].items(), key=lambda x: -x[1])[:20]:
logger.info("%5dx %s", count, error[:50])
logger.info("=" * 60)
def export_json(stats: dict, output_path: str):
"""Export statistics to JSON file."""
# Convert defaultdicts to regular dicts for JSON serialization
export_data = {
'total': stats['total'],
'successful': stats['successful'],
'failed': stats['failed'],
'by_pdf_type': dict(stats['by_pdf_type']),
'by_field': {},
'errors': dict(stats.get('errors', {})),
}
# Processing time stats
if 'processing_time_stats' in stats:
export_data['processing_time_stats'] = stats['processing_time_stats']
elif stats.get('processing_times'):
times = stats['processing_times']
export_data['processing_time_stats'] = {
'avg_ms': sum(times) / len(times),
'min_ms': min(times),
'max_ms': max(times),
'count': len(times)
}
# Field stats
for field_name, field_stats in stats['by_field'].items():
avg_score = field_stats.get('avg_score', 0)
if not avg_score and field_stats.get('scores'):
avg_score = sum(field_stats['scores']) / len(field_stats['scores'])
export_data['by_field'][field_name] = {
'total': field_stats['total'],
'matched': field_stats['matched'],
'exact_match': field_stats['exact_match'],
'flexible_match': field_stats['flexible_match'],
'match_rate': field_stats['matched'] / field_stats['total'] if field_stats['total'] > 0 else 0,
'avg_score': avg_score,
'by_pdf_type': dict(field_stats['by_pdf_type'])
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
logger.info("Statistics exported to: %s", output_path)
def main():
parser = argparse.ArgumentParser(
description='Analyze auto-label report'
)
parser.add_argument(
'--report', '-r',
default=None,
help='Path to autolabel report JSONL file (uses database if not specified)'
)
parser.add_argument(
'--output', '-o',
help='Export statistics to JSON file'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Show detailed error messages'
)
parser.add_argument(
'--from-file',
action='store_true',
help='Force reading from JSONL file instead of database'
)
args = parser.parse_args()
# Configure logging for CLI
setup_cli_logging()
# Decide source
use_db = not args.from_file and args.report is None
if use_db:
logger.info("Loading statistics from database...")
stats = load_reports_from_db()
logger.info("Loaded stats for %d documents", stats['total'])
else:
report_path = args.report or 'reports/autolabel_report.jsonl'
path = Path(report_path)
# Check if file exists (handle glob patterns)
if '*' not in str(path) and '?' not in str(path) and not path.exists():
logger.error("Report file not found: %s", path)
return 1
logger.info("Loading reports from: %s", report_path)
reports = load_reports_from_file(report_path)
logger.info("Loaded %d reports", len(reports))
stats = analyze_reports(reports)
print_report(stats, verbose=args.verbose)
if args.output:
export_json(stats, args.output)
return 0
if __name__ == '__main__':
exit(main())