"""Safety policy for destructive-action confirmation rules. This module makes the confirmation rules explicit and auditable. Every tool call passes through ``requires_confirmation`` before execution to decide whether human-in-the-loop approval is needed. Policy summary -------------- - ``read`` actions: execute immediately, no confirmation required. - ``write`` actions: require human approval via interrupt gate. - OpenAPI-imported endpoints: use ``needs_interrupt`` from classification. - If both the agent permission AND the endpoint classification agree the action is read-only, it executes without confirmation. Multi-intent semantics ---------------------- When a user message contains multiple intents (e.g. "cancel my order and apply a refund"), the supervisor routes them sequentially. Each action is evaluated independently: - If a write action is blocked by an interrupt, subsequent actions in the same message are paused until the interrupt is resolved. - Read actions that follow a blocked write are also paused (sequential, not best-effort) to preserve causal ordering. - If an interrupt is rejected, the remaining actions are skipped and the agent informs the user. MCP error taxonomy ------------------ Tool execution errors are classified into categories for retry decisions: - ``transient``: network timeouts, rate limits, 5xx -- retryable up to 3 times. - ``validation``: bad parameters, 4xx -- not retryable, report to user. - ``auth``: 401/403 -- not retryable, escalate. - ``unknown``: unclassified -- not retryable, log and escalate. """ from __future__ import annotations from dataclasses import dataclass from typing import Literal @dataclass(frozen=True) class ConfirmationPolicy: """Result of evaluating whether an action needs confirmation.""" requires_confirmation: bool reason: str def requires_confirmation( *, agent_permission: Literal["read", "write"], needs_interrupt: bool | None = None, ) -> ConfirmationPolicy: """Determine whether an action requires human confirmation. Parameters ---------- agent_permission: The permission level of the agent executing the action. needs_interrupt: Override from OpenAPI classification. When ``None``, the decision is based solely on ``agent_permission``. """ if needs_interrupt is not None: if needs_interrupt: return ConfirmationPolicy( requires_confirmation=True, reason="Endpoint classified as requiring human approval", ) return ConfirmationPolicy( requires_confirmation=False, reason="Endpoint classified as safe (no interrupt needed)", ) if agent_permission == "write": return ConfirmationPolicy( requires_confirmation=True, reason="Write-permission agent actions require confirmation", ) return ConfirmationPolicy( requires_confirmation=False, reason="Read-only agent actions execute immediately", ) # --- MCP Error Taxonomy --- MCP_ERROR_CATEGORY = Literal["transient", "validation", "auth", "unknown"] _TRANSIENT_STATUS_CODES = frozenset({408, 429, 500, 502, 503, 504}) _AUTH_STATUS_CODES = frozenset({401, 403}) _MAX_RETRIES = 3 def classify_mcp_error( *, status_code: int | None = None, error_message: str = "", ) -> MCP_ERROR_CATEGORY: """Classify an MCP tool error for retry decisions.""" if status_code is not None: if status_code in _TRANSIENT_STATUS_CODES: return "transient" if status_code in _AUTH_STATUS_CODES: return "auth" if 400 <= status_code < 500: return "validation" lower_msg = error_message.lower() if any(kw in lower_msg for kw in ("timeout", "timed out", "rate limit")): return "transient" if any(kw in lower_msg for kw in ("unauthorized", "forbidden")): return "auth" if any(kw in lower_msg for kw in ("invalid", "missing", "bad request")): return "validation" return "unknown" def is_retryable(category: MCP_ERROR_CATEGORY) -> bool: """Return whether a given error category is retryable.""" return category == "transient" def max_retries() -> int: """Maximum retry attempts for transient errors.""" return _MAX_RETRIES