Files
smart-support/backend/tests/e2e/test_replay_analytics.py
Yaojia Wang 19fc9f3289 test: close coverage gaps and add frontend test infrastructure
Backend (516 tests, 94% coverage):
- Add azure_openai endpoint/deployment validation tests (config.py -> 100%)
- Add _total_conversations and _avg_turns direct tests (queries.py -> 100%)
- Add transformer edge cases: list content, string checkpoint, invalid JSON,
  malformed message graceful skip (transformer.py -> 93%)
- Add safety combined status_code+error_message interaction tests
- Fix ambiguous 200/422 assertion to strict 422
- Add E2E pagination shape assertions (total, page, per_page, row count)
- Fix ReplayPool mock to respect LIMIT/OFFSET params

Frontend (23 tests, vitest + happy-dom + @testing-library/react):
- Add vitest infrastructure with happy-dom environment
- Add api.ts tests: success, HTTP error, success=false, URL encoding
- Add DashboardPage tests: loading, data, error, empty states
- Add ReplayListPage tests: loading, empty, data, error, status badge classes
- Add ReplayPage tests: loading, steps, empty, error states
2026-04-06 13:32:10 +02:00

231 lines
7.9 KiB
Python

"""E2E tests for replay and analytics flows (flow 6).
Flow 6: list conversations -> select one -> step-by-step replay.
Also tests the analytics dashboard endpoint.
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from starlette.testclient import TestClient
from tests.e2e.conftest import FakePool, create_e2e_app
pytestmark = pytest.mark.e2e
# ---------------------------------------------------------------------------
# Custom pool that returns specific data per query
# ---------------------------------------------------------------------------
class ReplayPool(FakePool):
"""Pool that returns different data depending on the SQL query."""
def __init__(
self,
conversations: list[dict] | None = None,
checkpoints: list[dict] | None = None,
analytics_rows: list[dict] | None = None,
) -> None:
super().__init__()
self._conversations = conversations or []
self._checkpoints = checkpoints or []
self._analytics = analytics_rows or []
class _Conn:
def __init__(self, convos, checkpoints, analytics):
self._convos = convos
self._checkpoints = checkpoints
self._analytics = analytics
async def execute(self, query: str, params=None):
from tests.e2e.conftest import FakeCursor
if "COUNT" in query and "conversations" in query:
return FakeCursor([(len(self._convos),)])
if "conversations" in query and "SELECT" in query:
# Respect LIMIT/OFFSET from params if provided
rows = self._convos
if params:
offset = params.get("offset", 0)
limit = params.get("limit", len(rows))
rows = rows[offset : offset + limit]
return FakeCursor(rows)
if "checkpoints" in query:
return FakeCursor(self._checkpoints)
# Analytics queries
return FakeCursor(self._analytics)
def connection(self):
from contextlib import asynccontextmanager
conn = self._Conn(self._conversations, self._checkpoints, self._analytics)
@asynccontextmanager
async def _ctx():
yield conn
return _ctx()
class TestFlow6ReplayConversation:
"""Flow 6: list conversations -> select one -> step replay."""
def test_list_conversations(self) -> None:
now = datetime.now(tz=timezone.utc).isoformat()
conversations = [
{
"thread_id": "conv-001",
"created_at": now,
"last_activity": now,
"status": "active",
"total_tokens": 150,
"total_cost_usd": 0.003,
},
{
"thread_id": "conv-002",
"created_at": now,
"last_activity": now,
"status": "completed",
"total_tokens": 300,
"total_cost_usd": 0.006,
},
]
pool = ReplayPool(conversations=conversations)
app = create_e2e_app(pool=pool)
with TestClient(app) as client:
resp = client.get("/api/conversations")
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
data = body["data"]
assert len(data["conversations"]) == 2
assert data["conversations"][0]["thread_id"] == "conv-001"
assert data["conversations"][1]["thread_id"] == "conv-002"
assert data["total"] == 2
def test_list_conversations_pagination(self) -> None:
conversations = [
{
"thread_id": f"conv-{i:03d}",
"created_at": "2026-04-01T00:00:00Z",
"last_activity": "2026-04-01T00:00:00Z",
"status": "active",
"total_tokens": 100,
"total_cost_usd": 0.001,
}
for i in range(5)
]
pool = ReplayPool(conversations=conversations)
app = create_e2e_app(pool=pool)
with TestClient(app) as client:
resp = client.get("/api/conversations", params={"page": 1, "per_page": 2})
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
data = body["data"]
assert data["total"] == 5
assert data["page"] == 1
assert data["per_page"] == 2
assert len(data["conversations"]) == 2
def test_replay_thread_not_found(self) -> None:
pool = ReplayPool(checkpoints=[])
app = create_e2e_app(pool=pool)
with TestClient(app) as client:
resp = client.get("/api/replay/nonexistent-thread")
assert resp.status_code == 404
def test_replay_invalid_thread_id_format(self) -> None:
app = create_e2e_app()
with TestClient(app) as client:
# Thread ID with special chars fails regex validation
resp = client.get("/api/replay/invalid%20thread%21%40")
assert resp.status_code == 400
class TestAnalyticsDashboard:
"""Analytics endpoint tests."""
def test_analytics_invalid_range_format(self) -> None:
app = create_e2e_app()
with TestClient(app) as client:
resp = client.get("/api/analytics", params={"range": "invalid"})
assert resp.status_code == 400
def test_analytics_range_too_large(self) -> None:
app = create_e2e_app()
with TestClient(app) as client:
resp = client.get("/api/analytics", params={"range": "999d"})
assert resp.status_code == 400
def test_analytics_range_zero_rejected(self) -> None:
app = create_e2e_app()
with TestClient(app) as client:
resp = client.get("/api/analytics", params={"range": "0d"})
assert resp.status_code == 400
class TestFullUserJourney:
"""End-to-end journey: chat -> then check replay list shows the conversation."""
def test_chat_then_check_conversations_endpoint(self) -> None:
"""After chatting via WebSocket, the conversations endpoint is reachable."""
from tests.e2e.conftest import make_chunk, make_graph
graph = make_graph(chunks=[make_chunk("Your order is shipped.")])
now = datetime.now(tz=timezone.utc).isoformat()
pool = ReplayPool(
conversations=[
{
"thread_id": "e2e-journey-1",
"created_at": now,
"last_activity": now,
"status": "active",
"total_tokens": 50,
"total_cost_usd": 0.001,
},
],
)
app = create_e2e_app(graph=graph, pool=pool)
with TestClient(app) as client:
# Step 1: Chat via WebSocket
with client.websocket_connect("/ws") as ws:
ws.send_json({
"type": "message",
"thread_id": "e2e-journey-1",
"content": "Where is my order?",
})
messages = []
for _ in range(20):
msg = ws.receive_json()
messages.append(msg)
if msg["type"] in ("message_complete", "error"):
break
assert any(m["type"] == "message_complete" for m in messages)
# Step 2: Check conversations endpoint
resp = client.get("/api/conversations")
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
assert any(
c["thread_id"] == "e2e-journey-1"
for c in body["data"]["conversations"]
)
# Step 3: Health check still works
resp = client.get("/api/health")
assert resp.status_code == 200