222 lines
7.5 KiB
Python
222 lines
7.5 KiB
Python
"""
|
|
Tests for Batch Upload Service
|
|
"""
|
|
|
|
import io
|
|
import zipfile
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from src.data.admin_db import AdminDB
|
|
from src.web.services.batch_upload import BatchUploadService
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_db():
|
|
"""Mock admin database for testing."""
|
|
class MockAdminDB:
|
|
def __init__(self):
|
|
self.batches = {}
|
|
self.batch_files = {}
|
|
|
|
def create_batch_upload(self, admin_token, filename, file_size, upload_source):
|
|
batch_id = uuid4()
|
|
batch = type('BatchUpload', (), {
|
|
'batch_id': batch_id,
|
|
'admin_token': admin_token,
|
|
'filename': filename,
|
|
'file_size': file_size,
|
|
'upload_source': upload_source,
|
|
'status': 'processing',
|
|
'total_files': 0,
|
|
'processed_files': 0,
|
|
'successful_files': 0,
|
|
'failed_files': 0,
|
|
'csv_filename': None,
|
|
'csv_row_count': None,
|
|
'error_message': None,
|
|
'created_at': None,
|
|
'completed_at': None,
|
|
})()
|
|
self.batches[batch_id] = batch
|
|
return batch
|
|
|
|
def update_batch_upload(self, batch_id, **kwargs):
|
|
if batch_id in self.batches:
|
|
batch = self.batches[batch_id]
|
|
for key, value in kwargs.items():
|
|
setattr(batch, key, value)
|
|
|
|
def create_batch_upload_file(self, batch_id, filename, **kwargs):
|
|
file_id = uuid4()
|
|
# Set defaults for attributes
|
|
defaults = {
|
|
'file_id': file_id,
|
|
'batch_id': batch_id,
|
|
'filename': filename,
|
|
'status': 'pending',
|
|
'error_message': None,
|
|
'annotation_count': 0,
|
|
'csv_row_data': None,
|
|
}
|
|
defaults.update(kwargs)
|
|
file_record = type('BatchUploadFile', (), defaults)()
|
|
if batch_id not in self.batch_files:
|
|
self.batch_files[batch_id] = []
|
|
self.batch_files[batch_id].append(file_record)
|
|
return file_record
|
|
|
|
def update_batch_upload_file(self, file_id, **kwargs):
|
|
for files in self.batch_files.values():
|
|
for file_record in files:
|
|
if file_record.file_id == file_id:
|
|
for key, value in kwargs.items():
|
|
setattr(file_record, key, value)
|
|
return
|
|
|
|
def get_batch_upload(self, batch_id):
|
|
return self.batches.get(batch_id)
|
|
|
|
def get_batch_upload_files(self, batch_id):
|
|
return self.batch_files.get(batch_id, [])
|
|
|
|
return MockAdminDB()
|
|
|
|
|
|
@pytest.fixture
|
|
def batch_service(admin_db):
|
|
"""Batch upload service instance."""
|
|
return BatchUploadService(admin_db)
|
|
|
|
|
|
def create_test_zip(files):
|
|
"""Create a test ZIP file with given files.
|
|
|
|
Args:
|
|
files: Dictionary mapping filenames to content bytes
|
|
|
|
Returns:
|
|
ZIP file content as bytes
|
|
"""
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
|
|
for filename, content in files.items():
|
|
zip_file.writestr(filename, content)
|
|
return zip_buffer.getvalue()
|
|
|
|
|
|
class TestBatchUploadService:
|
|
"""Tests for BatchUploadService."""
|
|
|
|
def test_process_empty_zip(self, batch_service):
|
|
"""Test processing an empty ZIP file."""
|
|
zip_content = create_test_zip({})
|
|
result = batch_service.process_zip_upload(
|
|
admin_token="test-token",
|
|
zip_filename="empty.zip",
|
|
zip_content=zip_content,
|
|
)
|
|
|
|
assert result["status"] == "failed"
|
|
assert "No PDF files" in result.get("error", "")
|
|
|
|
def test_process_zip_with_pdfs_only(self, batch_service):
|
|
"""Test processing ZIP with PDFs but no CSV."""
|
|
files = {
|
|
"INV001.pdf": b"%PDF-1.4 test content",
|
|
"INV002.pdf": b"%PDF-1.4 test content 2",
|
|
}
|
|
zip_content = create_test_zip(files)
|
|
|
|
result = batch_service.process_zip_upload(
|
|
admin_token="test-token",
|
|
zip_filename="invoices.zip",
|
|
zip_content=zip_content,
|
|
)
|
|
|
|
assert result["status"] == "completed"
|
|
assert result["total_files"] == 2
|
|
assert result["successful_files"] == 2
|
|
assert result["failed_files"] == 0
|
|
|
|
def test_process_zip_with_csv(self, batch_service):
|
|
"""Test processing ZIP with PDFs and CSV."""
|
|
csv_content = """DocumentId,InvoiceNumber,Amount,OCR
|
|
INV001,F2024-001,1500.00,7350012345678
|
|
INV002,F2024-002,2500.00,7350087654321
|
|
"""
|
|
files = {
|
|
"INV001.pdf": b"%PDF-1.4 test content",
|
|
"INV002.pdf": b"%PDF-1.4 test content 2",
|
|
"metadata.csv": csv_content.encode('utf-8'),
|
|
}
|
|
zip_content = create_test_zip(files)
|
|
|
|
result = batch_service.process_zip_upload(
|
|
admin_token="test-token",
|
|
zip_filename="invoices.zip",
|
|
zip_content=zip_content,
|
|
)
|
|
|
|
assert result["status"] == "completed"
|
|
assert result["total_files"] == 2
|
|
assert result["csv_filename"] == "metadata.csv"
|
|
assert result["csv_row_count"] == 2
|
|
|
|
def test_process_invalid_zip(self, batch_service):
|
|
"""Test processing invalid ZIP file."""
|
|
result = batch_service.process_zip_upload(
|
|
admin_token="test-token",
|
|
zip_filename="invalid.zip",
|
|
zip_content=b"not a zip file",
|
|
)
|
|
|
|
assert result["status"] == "failed"
|
|
assert "Invalid ZIP file" in result.get("error", "")
|
|
|
|
def test_csv_parsing(self, batch_service):
|
|
"""Test CSV field parsing."""
|
|
csv_content = """DocumentId,InvoiceNumber,InvoiceDate,Amount,OCR,Bankgiro,customer_number
|
|
INV001,F2024-001,2024-01-15,1500.00,7350012345678,123-4567,C123
|
|
INV002,F2024-002,2024-01-16,2500.00,7350087654321,123-4567,C124
|
|
"""
|
|
zip_file_content = create_test_zip({"metadata.csv": csv_content.encode('utf-8')})
|
|
|
|
with zipfile.ZipFile(io.BytesIO(zip_file_content)) as zip_file:
|
|
csv_file_info = [f for f in zip_file.filelist if f.filename.endswith('.csv')][0]
|
|
csv_data = batch_service._parse_csv_file(zip_file, csv_file_info)
|
|
|
|
assert len(csv_data) == 2
|
|
assert "INV001" in csv_data
|
|
assert csv_data["INV001"]["InvoiceNumber"] == "F2024-001"
|
|
assert csv_data["INV001"]["Amount"] == "1500.00"
|
|
assert csv_data["INV001"]["customer_number"] == "C123"
|
|
|
|
def test_get_batch_status(self, batch_service, admin_db):
|
|
"""Test getting batch upload status."""
|
|
# Create a batch
|
|
zip_content = create_test_zip({"INV001.pdf": b"%PDF-1.4 test"})
|
|
result = batch_service.process_zip_upload(
|
|
admin_token="test-token",
|
|
zip_filename="test.zip",
|
|
zip_content=zip_content,
|
|
)
|
|
|
|
batch_id = result["batch_id"]
|
|
|
|
# Get status
|
|
status = batch_service.get_batch_status(batch_id)
|
|
|
|
assert status["batch_id"] == batch_id
|
|
assert status["filename"] == "test.zip"
|
|
assert status["status"] == "completed"
|
|
assert status["total_files"] == 1
|
|
assert len(status["files"]) == 1
|
|
|
|
def test_get_batch_status_not_found(self, batch_service):
|
|
"""Test getting status for non-existent batch."""
|
|
status = batch_service.get_batch_status(str(uuid4()))
|
|
assert "error" in status
|