Files
smart-support/backend/tests/e2e/conftest.py
Yaojia Wang e0931daece feat: wire frontend pages to live APIs and standardize response contracts (P1)
- Backend: Add COUNT query and paginated response shape to conversations endpoint
  Returns { conversations: [...], total, page, per_page } instead of flat array
- Frontend: Replace mock data in DashboardPage with fetchAnalytics() API calls
- Frontend: Replace mock data in ReplayListPage with fetchConversations() API calls
- Frontend: Replace mock data in ReplayPage with fetchReplay() API calls
- Add proper loading, empty, and error states to all three pages
- Align ConversationSummary type with actual DB columns (created_at, status)
- Update unit and E2E tests for new paginated conversation response shape
- Add fetchone() to FakeCursor for COUNT query support in E2E tests
2026-04-05 23:06:00 +02:00

223 lines
6.3 KiB
Python

"""E2E test fixtures -- full FastAPI app with mocked LLM and database."""
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from httpx import ASGITransport, AsyncClient
from app.analytics.api import router as analytics_router
from app.callbacks import TokenUsageCallbackHandler
from app.interrupt_manager import InterruptManager
from app.openapi.review_api import _job_store, router as openapi_router
from app.replay.api import router as replay_router
from app.session_manager import SessionManager
from app.ws_handler import dispatch_message
# ---------------------------------------------------------------------------
# Graph helpers -- simulate LangGraph streaming behaviour
# ---------------------------------------------------------------------------
class AsyncIterHelper:
"""Make a list behave as an async iterator."""
def __init__(self, items: list) -> None:
self._items = list(items)
def __aiter__(self):
return self
async def __anext__(self):
if not self._items:
raise StopAsyncIteration
return self._items.pop(0)
def make_chunk(content: str, node: str = "order_lookup") -> tuple:
c = MagicMock()
c.content = content
c.tool_calls = []
return (c, {"langgraph_node": node})
def make_tool_chunk(name: str, args: dict, node: str = "order_lookup") -> tuple:
c = MagicMock()
c.content = ""
c.tool_calls = [{"name": name, "args": args}]
return (c, {"langgraph_node": node})
def make_state(*, interrupt: bool = False, data: dict | None = None) -> Any:
s = MagicMock()
if interrupt:
obj = MagicMock()
obj.value = data or {"action": "cancel_order", "order_id": "1042"}
t = MagicMock()
t.interrupts = (obj,)
s.tasks = (t,)
else:
s.tasks = ()
return s
def make_graph(
chunks: list | None = None,
state: Any = None,
resume_chunks: list | None = None,
) -> MagicMock:
"""Build a mock LangGraph CompiledStateGraph."""
g = MagicMock()
g.intent_classifier = None
g.agent_registry = None
if state is None:
state = make_state()
streams = [chunks or [], resume_chunks or []]
idx = {"n": 0}
def astream_side_effect(*a, **kw):
i = min(idx["n"], len(streams) - 1)
idx["n"] += 1
return AsyncIterHelper(list(streams[i]))
g.astream = MagicMock(side_effect=astream_side_effect)
g.aget_state = AsyncMock(return_value=state)
return g
# ---------------------------------------------------------------------------
# Fake database pool
# ---------------------------------------------------------------------------
class FakeCursor:
"""Minimal async cursor returning pre-configured rows."""
def __init__(self, rows: list[dict]) -> None:
self._rows = rows
async def fetchall(self) -> list[dict]:
return self._rows
async def fetchone(self) -> tuple | dict | None:
return self._rows[0] if self._rows else None
class FakeConnection:
"""Fake async connection that returns a FakeCursor."""
def __init__(self, rows: list[dict]) -> None:
self._rows = rows
async def execute(self, query: str, params: dict | None = None) -> FakeCursor:
return FakeCursor(self._rows)
class FakePool:
"""Minimal pool that yields a fake connection."""
def __init__(self, rows: list[dict] | None = None) -> None:
self._rows = rows or []
@asynccontextmanager
async def connection(self):
yield FakeConnection(self._rows)
async def close(self) -> None:
pass
# ---------------------------------------------------------------------------
# App factory
# ---------------------------------------------------------------------------
def create_e2e_app(
graph: MagicMock | None = None,
pool: FakePool | None = None,
session_ttl: int = 3600,
interrupt_ttl: int = 1800,
) -> FastAPI:
"""Create a FastAPI app wired with mocked dependencies for E2E testing."""
g = graph or make_graph()
p = pool or FakePool()
sm = SessionManager(session_ttl_seconds=session_ttl)
im = InterruptManager(ttl_seconds=interrupt_ttl)
app = FastAPI(title="Smart Support E2E Test")
app.include_router(openapi_router)
app.include_router(replay_router)
app.include_router(analytics_router)
app.state.graph = g
app.state.session_manager = sm
app.state.interrupt_manager = im
app.state.pool = p
app.state.settings = MagicMock(llm_model="test-model")
app.state.analytics_recorder = AsyncMock()
app.state.conversation_tracker = AsyncMock()
@app.get("/api/health")
def health_check() -> dict:
return {"status": "ok", "version": "test"}
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket) -> None:
await ws.accept()
try:
while True:
raw_data = await ws.receive_text()
await dispatch_message(
ws,
app.state.graph,
app.state.session_manager,
TokenUsageCallbackHandler(model_name="test-model"),
raw_data,
interrupt_manager=app.state.interrupt_manager,
analytics_recorder=app.state.analytics_recorder,
conversation_tracker=app.state.conversation_tracker,
pool=app.state.pool,
)
except WebSocketDisconnect:
pass
return app
@pytest.fixture
def e2e_graph():
"""Default graph fixture -- returns tokens and message_complete."""
return make_graph(
chunks=[make_chunk("Order 1042 is "), make_chunk("shipped.")]
)
@pytest.fixture
def e2e_app(e2e_graph):
"""Default E2E app fixture."""
return create_e2e_app(graph=e2e_graph)
@pytest.fixture
async def e2e_client(e2e_app):
"""Async HTTP client for E2E tests."""
transport = ASGITransport(app=e2e_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.fixture(autouse=True)
def clear_openapi_job_store():
"""Clear the in-memory job store between tests."""
_job_store.clear()
yield
_job_store.clear()