Files
smart-support/backend/app/escalation.py
Yaojia Wang f0699436c5 refactor: engineering improvements -- API versioning, structured logging, Alembic, error standardization, test coverage
- API versioning: all REST endpoints prefixed with /api/v1/
- Structured logging: replaced stdlib logging with structlog (console/JSON modes)
- Alembic migrations: versioned DB schema with initial migration
- Error standardization: global exception handlers for consistent envelope format
- Interrupt cleanup: asyncio background task for expired interrupt removal
- Integration tests: +30 tests (analytics, replay, openapi, error, session APIs)
- Frontend tests: +57 tests (all components, pages, useWebSocket hook)
- Backend: 557 tests, 89.75% coverage | Frontend: 80 tests, 16 test files
2026-04-06 23:19:29 +02:00

141 lines
4.3 KiB
Python

"""Webhook escalation module -- HTTP POST with exponential backoff retry."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Protocol
import httpx
import structlog
from pydantic import BaseModel
logger = structlog.get_logger()
class EscalationPayload(BaseModel, frozen=True):
"""Immutable payload sent to the escalation webhook."""
thread_id: str
reason: str
conversation_summary: str
metadata: dict = {}
@dataclass(frozen=True)
class EscalationResult:
"""Immutable result of an escalation attempt."""
success: bool
status_code: int | None
attempts: int
error: str | None
class EscalationService(Protocol):
"""Protocol for escalation implementations."""
async def escalate(self, payload: EscalationPayload) -> EscalationResult: ...
class WebhookEscalator:
"""Sends escalation requests via HTTP POST with exponential backoff retry."""
def __init__(
self,
url: str,
timeout_seconds: int = 10,
max_retries: int = 3,
) -> None:
self._url = url
self._timeout = timeout_seconds
self._max_retries = max_retries
async def escalate(self, payload: EscalationPayload) -> EscalationResult:
"""POST the escalation payload to the configured webhook URL."""
if not self._url:
return EscalationResult(
success=False,
status_code=None,
attempts=0,
error="Webhook URL not configured",
)
last_error: str | None = None
async with httpx.AsyncClient(timeout=self._timeout) as client:
for attempt in range(1, self._max_retries + 1):
try:
response = await client.post(
self._url,
json=payload.model_dump(),
)
if 200 <= response.status_code < 300:
logger.info(
"Escalation succeeded for thread %s (attempt %d)",
payload.thread_id,
attempt,
)
return EscalationResult(
success=True,
status_code=response.status_code,
attempts=attempt,
error=None,
)
last_error = f"HTTP {response.status_code}"
logger.warning(
"Escalation attempt %d/%d failed: %s",
attempt,
self._max_retries,
last_error,
)
except httpx.TimeoutException:
last_error = "Request timed out"
logger.warning(
"Escalation attempt %d/%d timed out",
attempt,
self._max_retries,
)
except httpx.RequestError as exc:
last_error = str(exc)
logger.warning(
"Escalation attempt %d/%d error: %s",
attempt,
self._max_retries,
last_error,
)
# Exponential backoff: skip delay after last attempt
if attempt < self._max_retries:
delay = 2**attempt
await asyncio.sleep(delay)
logger.error(
"Escalation failed for thread %s after %d attempts: %s",
payload.thread_id,
self._max_retries,
last_error,
)
return EscalationResult(
success=False,
status_code=None,
attempts=self._max_retries,
error=last_error,
)
class NoOpEscalator:
"""Escalator that does nothing -- used when webhook URL is not configured."""
async def escalate(self, payload: EscalationPayload) -> EscalationResult:
logger.info("Escalation disabled (no webhook URL). Thread: %s", payload.thread_id)
return EscalationResult(
success=False,
status_code=None,
attempts=0,
error="Escalation disabled",
)