Files
invoice-master-poc-v2/src/data/csv_loader.py
Yaojia Wang 425b8fdedf WIP
2026-01-16 23:10:01 +01:00

360 lines
12 KiB
Python

"""
CSV Data Loader
Loads and parses structured invoice data from CSV files.
Follows the CSV specification for invoice data.
"""
import csv
from dataclasses import dataclass, field
from datetime import datetime, date
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any, Iterator
@dataclass
class InvoiceRow:
"""Parsed invoice data row."""
DocumentId: str
InvoiceDate: date | None = None
InvoiceNumber: str | None = None
InvoiceDueDate: date | None = None
OCR: str | None = None
Message: str | None = None
Bankgiro: str | None = None
Plusgiro: str | None = None
Amount: Decimal | None = None
# New fields
split: str | None = None # train/test split indicator
customer_number: str | None = None # Customer number (no matching needed)
supplier_name: str | None = None # Supplier name (no matching)
supplier_organisation_number: str | None = None # Swedish org number (needs matching)
supplier_accounts: str | None = None # Supplier accounts (needs matching)
# Raw values for reference
raw_data: dict = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for matching."""
return {
'DocumentId': self.DocumentId,
'InvoiceDate': self.InvoiceDate.isoformat() if self.InvoiceDate else None,
'InvoiceNumber': self.InvoiceNumber,
'InvoiceDueDate': self.InvoiceDueDate.isoformat() if self.InvoiceDueDate else None,
'OCR': self.OCR,
'Bankgiro': self.Bankgiro,
'Plusgiro': self.Plusgiro,
'Amount': str(self.Amount) if self.Amount else None,
'supplier_organisation_number': self.supplier_organisation_number,
'supplier_accounts': self.supplier_accounts,
}
def get_field_value(self, field_name: str) -> str | None:
"""Get field value as string for matching."""
value = getattr(self, field_name, None)
if value is None:
return None
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Decimal):
return str(value)
return str(value) if value else None
class CSVLoader:
"""Loads invoice data from CSV files."""
# Expected field mappings (CSV header -> InvoiceRow attribute)
FIELD_MAPPINGS = {
'DocumentId': 'DocumentId',
'InvoiceDate': 'InvoiceDate',
'InvoiceNumber': 'InvoiceNumber',
'InvoiceDueDate': 'InvoiceDueDate',
'OCR': 'OCR',
'Message': 'Message',
'Bankgiro': 'Bankgiro',
'Plusgiro': 'Plusgiro',
'Amount': 'Amount',
# New fields
'split': 'split',
'customer_number': 'customer_number',
'supplier_name': 'supplier_name',
'supplier_organisation_number': 'supplier_organisation_number',
'supplier_accounts': 'supplier_accounts',
}
def __init__(
self,
csv_path: str | Path | list[str | Path],
pdf_dir: str | Path | None = None,
doc_map_path: str | Path | None = None,
encoding: str = 'utf-8'
):
"""
Initialize CSV loader.
Args:
csv_path: Path to CSV file(s). Can be:
- Single path: 'data/file.csv'
- List of paths: ['data/file1.csv', 'data/file2.csv']
- Glob pattern: 'data/*.csv' or 'data/export_*.csv'
pdf_dir: Directory containing PDF files (default: data/raw_pdfs)
doc_map_path: Optional path to document mapping CSV
encoding: CSV file encoding (default: utf-8)
"""
# Handle multiple CSV files
if isinstance(csv_path, list):
self.csv_paths = [Path(p) for p in csv_path]
else:
csv_path = Path(csv_path)
# Check if it's a glob pattern (contains * or ?)
if '*' in str(csv_path) or '?' in str(csv_path):
parent = csv_path.parent
pattern = csv_path.name
self.csv_paths = sorted(parent.glob(pattern))
else:
self.csv_paths = [csv_path]
# For backward compatibility
self.csv_path = self.csv_paths[0] if self.csv_paths else None
self.pdf_dir = Path(pdf_dir) if pdf_dir else (self.csv_path.parent.parent / 'raw_pdfs' if self.csv_path else Path('data/raw_pdfs'))
self.doc_map_path = Path(doc_map_path) if doc_map_path else None
self.encoding = encoding
# Load document mapping if provided
self.doc_map = self._load_doc_map() if self.doc_map_path else {}
def _load_doc_map(self) -> dict[str, str]:
"""Load document ID to filename mapping."""
mapping = {}
if self.doc_map_path and self.doc_map_path.exists():
with open(self.doc_map_path, 'r', encoding=self.encoding) as f:
reader = csv.DictReader(f)
for row in reader:
doc_id = row.get('DocumentId', '').strip()
filename = row.get('FileName', '').strip()
if doc_id and filename:
mapping[doc_id] = filename
return mapping
def _parse_date(self, value: str | None) -> date | None:
"""Parse date from various formats."""
if not value or not value.strip():
return None
value = value.strip()
# Try different date formats
formats = [
'%Y-%m-%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%d/%m/%Y',
'%d.%m.%Y',
'%d-%m-%Y',
'%Y%m%d',
]
for fmt in formats:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
return None
def _parse_amount(self, value: str | None) -> Decimal | None:
"""Parse monetary amount from various formats."""
if not value or not value.strip():
return None
value = value.strip()
# Remove currency symbols and common suffixes
value = value.replace('SEK', '').replace('kr', '').replace(':-', '')
value = value.strip()
# Remove spaces (thousand separators)
value = value.replace(' ', '').replace('\xa0', '')
# Handle comma as decimal separator (European format)
if ',' in value and '.' not in value:
value = value.replace(',', '.')
elif ',' in value and '.' in value:
# Assume comma is thousands separator, dot is decimal
value = value.replace(',', '')
try:
return Decimal(value)
except InvalidOperation:
return None
def _parse_string(self, value: str | None) -> str | None:
"""Parse string field with cleanup."""
if value is None:
return None
value = value.strip()
return value if value else None
def _parse_row(self, row: dict) -> InvoiceRow | None:
"""Parse a single CSV row into InvoiceRow."""
doc_id = self._parse_string(row.get('DocumentId'))
if not doc_id:
return None
return InvoiceRow(
DocumentId=doc_id,
InvoiceDate=self._parse_date(row.get('InvoiceDate')),
InvoiceNumber=self._parse_string(row.get('InvoiceNumber')),
InvoiceDueDate=self._parse_date(row.get('InvoiceDueDate')),
OCR=self._parse_string(row.get('OCR')),
Message=self._parse_string(row.get('Message')),
Bankgiro=self._parse_string(row.get('Bankgiro')),
Plusgiro=self._parse_string(row.get('Plusgiro')),
Amount=self._parse_amount(row.get('Amount')),
# New fields
split=self._parse_string(row.get('split')),
customer_number=self._parse_string(row.get('customer_number')),
supplier_name=self._parse_string(row.get('supplier_name')),
supplier_organisation_number=self._parse_string(row.get('supplier_organisation_number')),
supplier_accounts=self._parse_string(row.get('supplier_accounts')),
raw_data=dict(row)
)
def _iter_single_csv(self, csv_path: Path) -> Iterator[InvoiceRow]:
"""Iterate over rows from a single CSV file."""
# Handle BOM - try utf-8-sig first to handle BOM correctly
encodings = ['utf-8-sig', self.encoding, 'latin-1']
for enc in encodings:
try:
with open(csv_path, 'r', encoding=enc) as f:
reader = csv.DictReader(f)
for row in reader:
parsed = self._parse_row(row)
if parsed:
yield parsed
return
except UnicodeDecodeError:
continue
raise ValueError(f"Could not read CSV file {csv_path} with any supported encoding")
def load_all(self) -> list[InvoiceRow]:
"""Load all rows from CSV(s)."""
rows = []
for row in self.iter_rows():
rows.append(row)
return rows
def iter_rows(self) -> Iterator[InvoiceRow]:
"""Iterate over CSV rows from all CSV files."""
seen_doc_ids = set()
for csv_path in self.csv_paths:
if not csv_path.exists():
continue
for row in self._iter_single_csv(csv_path):
# Deduplicate by DocumentId
if row.DocumentId not in seen_doc_ids:
seen_doc_ids.add(row.DocumentId)
yield row
def get_pdf_path(self, invoice_row: InvoiceRow) -> Path | None:
"""
Get PDF path for an invoice row.
Uses document mapping if available, otherwise assumes
DocumentId.pdf naming convention.
"""
doc_id = invoice_row.DocumentId
# Check document mapping first
if doc_id in self.doc_map:
filename = self.doc_map[doc_id]
pdf_path = self.pdf_dir / filename
if pdf_path.exists():
return pdf_path
# Try default naming patterns
patterns = [
f"{doc_id}.pdf",
f"{doc_id.lower()}.pdf",
f"{doc_id.upper()}.pdf",
]
for pattern in patterns:
pdf_path = self.pdf_dir / pattern
if pdf_path.exists():
return pdf_path
# Try glob patterns for partial matches
for pdf_file in self.pdf_dir.glob(f"*{doc_id}*.pdf"):
return pdf_file
return None
def get_row_by_id(self, doc_id: str) -> InvoiceRow | None:
"""Get a specific row by DocumentId."""
for row in self.iter_rows():
if row.DocumentId == doc_id:
return row
return None
def validate(self) -> list[dict]:
"""
Validate CSV data and return issues.
Returns:
List of validation issues
"""
issues = []
for i, row in enumerate(self.iter_rows(), start=2): # Start at 2 (header is row 1)
# Check required DocumentId
if not row.DocumentId:
issues.append({
'row': i,
'field': 'DocumentId',
'issue': 'Missing required DocumentId'
})
continue
# Check if PDF exists
pdf_path = self.get_pdf_path(row)
if not pdf_path:
issues.append({
'row': i,
'doc_id': row.DocumentId,
'field': 'PDF',
'issue': 'PDF file not found'
})
# Check for at least one matchable field
matchable_fields = [
row.InvoiceNumber,
row.OCR,
row.Bankgiro,
row.Plusgiro,
row.Amount,
row.supplier_organisation_number,
row.supplier_accounts,
]
if not any(matchable_fields):
issues.append({
'row': i,
'doc_id': row.DocumentId,
'field': 'All',
'issue': 'No matchable fields (InvoiceNumber/OCR/Bankgiro/Plusgiro/Amount/supplier_organisation_number/supplier_accounts)'
})
return issues
def load_invoice_csv(csv_path: str | Path | list[str | Path], pdf_dir: str | Path | None = None) -> list[InvoiceRow]:
"""Convenience function to load invoice CSV(s)."""
loader = CSVLoader(csv_path, pdf_dir)
return loader.load_all()