- Intent classification with LLM structured output (single/multi/ambiguous) - Discount agent with apply_discount and generate_coupon tools - Interrupt manager with 30-min TTL auto-expiration and retry prompts - Webhook escalation module with exponential backoff retry (max 3) - Three vertical industry templates (e-commerce, SaaS, fintech) - Template loading in AgentRegistry - Enhanced supervisor prompt with dynamic agent descriptions - 153 tests passing, 90.18% coverage
141 lines
4.8 KiB
Python
141 lines
4.8 KiB
Python
"""YAML Agent registry loader with validation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Literal
|
|
|
|
import yaml
|
|
from pydantic import BaseModel, field_validator
|
|
|
|
|
|
class PersonalityConfig(BaseModel, frozen=True):
|
|
tone: str = "professional and helpful"
|
|
greeting: str = "Hello! How can I help you today?"
|
|
escalation_message: str = "Let me connect you with a human agent."
|
|
|
|
|
|
class AgentConfig(BaseModel, frozen=True):
|
|
name: str
|
|
description: str
|
|
permission: Literal["read", "write"]
|
|
personality: PersonalityConfig = PersonalityConfig()
|
|
tools: list[str]
|
|
|
|
@field_validator("name")
|
|
@classmethod
|
|
def name_not_empty(cls, v: str) -> str:
|
|
if not v.strip():
|
|
raise ValueError("Agent name must not be empty")
|
|
return v.strip()
|
|
|
|
@field_validator("tools")
|
|
@classmethod
|
|
def tools_not_empty(cls, v: list[str]) -> list[str]:
|
|
if not v:
|
|
raise ValueError("Agent must have at least one tool")
|
|
return v
|
|
|
|
|
|
class AgentRegistry:
|
|
"""Immutable registry of agent configurations loaded from YAML."""
|
|
|
|
def __init__(self, agents: tuple[AgentConfig, ...]) -> None:
|
|
self._agents = {agent.name: agent for agent in agents}
|
|
|
|
@classmethod
|
|
def load(cls, yaml_path: str | Path) -> AgentRegistry:
|
|
"""Load and validate agent configurations from a YAML file."""
|
|
path = Path(yaml_path)
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Agent config file not found: {path}")
|
|
|
|
raw_text = path.read_text(encoding="utf-8")
|
|
if not raw_text.strip():
|
|
raise ValueError(f"Agent config file is empty: {path}")
|
|
|
|
try:
|
|
data = yaml.safe_load(raw_text)
|
|
except yaml.YAMLError as exc:
|
|
msg = f"Invalid YAML in {path}"
|
|
if hasattr(exc, "problem_mark") and exc.problem_mark is not None:
|
|
mark = exc.problem_mark
|
|
msg += f" at line {mark.line + 1}, column {mark.column + 1}"
|
|
raise ValueError(msg) from exc
|
|
|
|
if not isinstance(data, dict) or "agents" not in data:
|
|
raise ValueError(f"Agent config must have a top-level 'agents' key in {path}")
|
|
|
|
raw_agents = data["agents"]
|
|
if not isinstance(raw_agents, list) or not raw_agents:
|
|
raise ValueError(f"'agents' must be a non-empty list in {path}")
|
|
|
|
agents: list[AgentConfig] = []
|
|
seen_names: set[str] = set()
|
|
|
|
for i, raw in enumerate(raw_agents):
|
|
if not isinstance(raw, dict):
|
|
raise ValueError(f"Agent at index {i} must be a mapping in {path}")
|
|
try:
|
|
agent = AgentConfig(**raw)
|
|
except Exception as exc:
|
|
raise ValueError(f"Invalid agent config at index {i} in {path}: {exc}") from exc
|
|
|
|
if agent.name in seen_names:
|
|
raise ValueError(f"Duplicate agent name '{agent.name}' in {path}")
|
|
seen_names.add(agent.name)
|
|
agents.append(agent)
|
|
|
|
return cls(agents=tuple(agents))
|
|
|
|
def get_agent(self, name: str) -> AgentConfig:
|
|
if name not in self._agents:
|
|
available = ", ".join(sorted(self._agents.keys()))
|
|
raise KeyError(f"Agent '{name}' not found. Available: {available}")
|
|
return self._agents[name]
|
|
|
|
def list_agents(self) -> tuple[AgentConfig, ...]:
|
|
return tuple(self._agents.values())
|
|
|
|
def get_agents_by_permission(self, permission: str) -> tuple[AgentConfig, ...]:
|
|
return tuple(a for a in self._agents.values() if a.permission == permission)
|
|
|
|
@classmethod
|
|
def load_template(
|
|
cls,
|
|
template_name: str,
|
|
templates_dir: str | Path | None = None,
|
|
) -> AgentRegistry:
|
|
"""Load agent configurations from a named template."""
|
|
if templates_dir is None:
|
|
templates_dir = Path(__file__).parent.parent / "templates"
|
|
templates_dir = Path(templates_dir)
|
|
|
|
yaml_path = templates_dir / f"{template_name}.yaml"
|
|
if not yaml_path.exists():
|
|
available = cls.list_templates(templates_dir)
|
|
raise FileNotFoundError(
|
|
f"Template '{template_name}' not found. "
|
|
f"Available: {', '.join(available) if available else 'none'}"
|
|
)
|
|
return cls.load(yaml_path)
|
|
|
|
@classmethod
|
|
def list_templates(
|
|
cls,
|
|
templates_dir: str | Path | None = None,
|
|
) -> tuple[str, ...]:
|
|
"""List available template names from the templates directory."""
|
|
if templates_dir is None:
|
|
templates_dir = Path(__file__).parent.parent / "templates"
|
|
templates_dir = Path(templates_dir)
|
|
|
|
if not templates_dir.is_dir():
|
|
return ()
|
|
return tuple(
|
|
sorted(p.stem for p in templates_dir.glob("*.yaml"))
|
|
)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._agents)
|