Files
invoice-master-poc-v2/packages/shared/shared/matcher/field_matcher.py
2026-01-27 23:58:17 +01:00

220 lines
8.0 KiB
Python

"""
Field Matching Module - Refactored
Matches normalized field values to tokens extracted from documents.
"""
from .models import TokenLike, Match
from .token_index import TokenIndex
from .utils import bbox_overlap
from .strategies import (
ExactMatcher,
ConcatenatedMatcher,
SubstringMatcher,
FuzzyMatcher,
FlexibleDateMatcher,
)
class FieldMatcher:
"""Matches field values to document tokens."""
def __init__(
self,
context_radius: float = 200.0, # pixels - increased to handle label-value spacing in scanned PDFs
min_score_threshold: float = 0.5
):
"""
Initialize the matcher.
Args:
context_radius: Distance to search for context keywords (default 200px to handle
typical label-value spacing in scanned invoices at 150 DPI)
min_score_threshold: Minimum score to consider a match valid
"""
self.context_radius = context_radius
self.min_score_threshold = min_score_threshold
self._token_index: TokenIndex | None = None
# Initialize matching strategies
self.exact_matcher = ExactMatcher(context_radius)
self.concatenated_matcher = ConcatenatedMatcher(context_radius)
self.substring_matcher = SubstringMatcher(context_radius)
self.fuzzy_matcher = FuzzyMatcher(context_radius)
self.flexible_date_matcher = FlexibleDateMatcher(context_radius)
def find_matches(
self,
tokens: list[TokenLike],
field_name: str,
normalized_values: list[str],
page_no: int = 0
) -> list[Match]:
"""
Find all matches for a field in the token list.
Args:
tokens: List of tokens from the document
field_name: Name of the field to match
normalized_values: List of normalized value variants to search for
page_no: Page number to filter tokens
Returns:
List of Match objects sorted by score (descending)
"""
matches = []
# Filter tokens by page and exclude hidden metadata tokens
# Hidden tokens often have bbox with y < 0 or y > page_height
# These are typically PDF metadata stored as invisible text
page_tokens = [
t for t in tokens
if t.page_no == page_no and t.bbox[1] >= 0 and t.bbox[3] > t.bbox[1]
]
# Build spatial index for efficient nearby token lookup (O(n) -> O(1))
self._token_index = TokenIndex(page_tokens, grid_size=self.context_radius)
for value in normalized_values:
# Strategy 1: Exact token match
exact_matches = self.exact_matcher.find_matches(
page_tokens, value, field_name, self._token_index
)
matches.extend(exact_matches)
# Strategy 2: Multi-token concatenation
concat_matches = self.concatenated_matcher.find_matches(
page_tokens, value, field_name, self._token_index
)
matches.extend(concat_matches)
# Strategy 3: Fuzzy match (for amounts and dates only)
if field_name in ('Amount', 'InvoiceDate', 'InvoiceDueDate'):
fuzzy_matches = self.fuzzy_matcher.find_matches(
page_tokens, value, field_name, self._token_index
)
matches.extend(fuzzy_matches)
# Strategy 4: Substring match (for values embedded in longer text)
# e.g., "Fakturanummer: 2465027205" should match OCR value "2465027205"
# Note: Amount is excluded because short numbers like "451" can incorrectly match
# in OCR payment lines or other unrelated text
if field_name in (
'InvoiceDate', 'InvoiceDueDate', 'InvoiceNumber', 'OCR',
'Bankgiro', 'Plusgiro', 'supplier_organisation_number',
'supplier_accounts', 'customer_number'
):
substring_matches = self.substring_matcher.find_matches(
page_tokens, value, field_name, self._token_index
)
matches.extend(substring_matches)
# Strategy 5: Flexible date matching (year-month match, nearby dates, heuristic selection)
# Only if no exact matches found for date fields
if field_name in ('InvoiceDate', 'InvoiceDueDate') and not matches:
for value in normalized_values:
flexible_matches = self.flexible_date_matcher.find_matches(
page_tokens, value, field_name, self._token_index
)
matches.extend(flexible_matches)
# Deduplicate and sort by score
matches = self._deduplicate_matches(matches)
matches.sort(key=lambda m: m.score, reverse=True)
# Clear token index to free memory
self._token_index = None
return [m for m in matches if m.score >= self.min_score_threshold]
def _deduplicate_matches(self, matches: list[Match]) -> list[Match]:
"""
Remove duplicate matches based on bbox overlap.
Uses grid-based spatial hashing to reduce O(n²) to O(n) average case.
"""
if not matches:
return []
# Sort by: 1) score descending, 2) prefer matches with context keywords,
# 3) prefer upper positions (smaller y) for same-score matches
# This helps select the "main" occurrence in invoice body rather than footer
matches.sort(key=lambda m: (
-m.score,
-len(m.context_keywords), # More keywords = better
m.bbox[1] # Smaller y (upper position) = better
))
# Use spatial grid for efficient overlap checking
# Grid cell size based on typical bbox size
grid_size = 50.0 # pixels
grid: dict[tuple[int, int], list[Match]] = {}
unique = []
for match in matches:
bbox = match.bbox
# Calculate grid cells this bbox touches
min_gx = int(bbox[0] / grid_size)
min_gy = int(bbox[1] / grid_size)
max_gx = int(bbox[2] / grid_size)
max_gy = int(bbox[3] / grid_size)
# Check for overlap only with matches in nearby grid cells
is_duplicate = False
cells_to_check = set()
for gx in range(min_gx - 1, max_gx + 2):
for gy in range(min_gy - 1, max_gy + 2):
cells_to_check.add((gx, gy))
for cell in cells_to_check:
if cell in grid:
for existing in grid[cell]:
if bbox_overlap(bbox, existing.bbox) > 0.7:
is_duplicate = True
break
if is_duplicate:
break
if not is_duplicate:
unique.append(match)
# Add to all grid cells this bbox touches
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
key = (gx, gy)
if key not in grid:
grid[key] = []
grid[key].append(match)
return unique
def find_field_matches(
tokens: list[TokenLike],
field_values: dict[str, str],
page_no: int = 0
) -> dict[str, list[Match]]:
"""
Convenience function to find matches for multiple fields.
Args:
tokens: List of tokens from the document
field_values: Dict of field_name -> value to search for
page_no: Page number
Returns:
Dict of field_name -> list of matches
"""
from ..normalize import normalize_field
matcher = FieldMatcher()
results = {}
for field_name, value in field_values.items():
if value is None or str(value).strip() == '':
continue
normalized_values = normalize_field(field_name, str(value))
matches = matcher.find_matches(tokens, field_name, normalized_values, page_no)
results[field_name] = matches
return results