- Intent classification with LLM structured output (single/multi/ambiguous) - Discount agent with apply_discount and generate_coupon tools - Interrupt manager with 30-min TTL auto-expiration and retry prompts - Webhook escalation module with exponential backoff retry (max 3) - Three vertical industry templates (e-commerce, SaaS, fintech) - Template loading in AgentRegistry - Enhanced supervisor prompt with dynamic agent descriptions - 153 tests passing, 90.18% coverage
133 lines
4.7 KiB
Python
133 lines
4.7 KiB
Python
"""Tests for app.interrupt_manager module."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from app.interrupt_manager import InterruptManager
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestInterruptManagerRegister:
|
|
def test_register_creates_record(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=1800)
|
|
record = mgr.register("t1", "cancel_order", {"order_id": "1042"})
|
|
assert record.thread_id == "t1"
|
|
assert record.action == "cancel_order"
|
|
assert record.ttl_seconds == 1800
|
|
assert record.interrupt_id
|
|
|
|
def test_register_overwrites_previous(self) -> None:
|
|
mgr = InterruptManager()
|
|
r1 = mgr.register("t1", "cancel_order", {})
|
|
r2 = mgr.register("t1", "apply_discount", {})
|
|
assert r1.interrupt_id != r2.interrupt_id
|
|
status = mgr.check_status("t1")
|
|
assert status is not None
|
|
assert status.record.action == "apply_discount"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestInterruptManagerCheckStatus:
|
|
def test_no_interrupt_returns_none(self) -> None:
|
|
mgr = InterruptManager()
|
|
assert mgr.check_status("t1") is None
|
|
|
|
def test_fresh_interrupt_not_expired(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=1800)
|
|
mgr.register("t1", "cancel_order", {})
|
|
status = mgr.check_status("t1")
|
|
assert status is not None
|
|
assert not status.is_expired
|
|
assert status.remaining_seconds > 0
|
|
|
|
def test_expired_interrupt(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=10)
|
|
mgr.register("t1", "cancel_order", {})
|
|
# Move time forward
|
|
with patch("app.interrupt_manager.time") as mock_time:
|
|
mock_time.time.return_value = mgr._interrupts["t1"].created_at + 11
|
|
status = mgr.check_status("t1")
|
|
assert status is not None
|
|
assert status.is_expired
|
|
assert status.remaining_seconds == 0.0
|
|
|
|
def test_boundary_not_expired(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=10)
|
|
mgr.register("t1", "cancel_order", {})
|
|
with patch("app.interrupt_manager.time") as mock_time:
|
|
mock_time.time.return_value = mgr._interrupts["t1"].created_at + 9
|
|
status = mgr.check_status("t1")
|
|
assert status is not None
|
|
assert not status.is_expired
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestInterruptManagerResolve:
|
|
def test_resolve_removes_record(self) -> None:
|
|
mgr = InterruptManager()
|
|
mgr.register("t1", "cancel_order", {})
|
|
mgr.resolve("t1")
|
|
assert mgr.check_status("t1") is None
|
|
|
|
def test_resolve_nonexistent_is_safe(self) -> None:
|
|
mgr = InterruptManager()
|
|
mgr.resolve("nonexistent") # Should not raise
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestInterruptManagerCleanup:
|
|
def test_cleanup_removes_expired(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=5)
|
|
mgr.register("t1", "cancel_order", {})
|
|
mgr.register("t2", "apply_discount", {})
|
|
|
|
with patch("app.interrupt_manager.time") as mock_time:
|
|
mock_time.time.return_value = mgr._interrupts["t1"].created_at + 6
|
|
expired = mgr.cleanup_expired()
|
|
assert len(expired) == 2
|
|
assert mgr.check_status("t1") is None
|
|
assert mgr.check_status("t2") is None
|
|
|
|
def test_cleanup_keeps_active(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=100)
|
|
mgr.register("t1", "cancel_order", {})
|
|
|
|
expired = mgr.cleanup_expired()
|
|
assert len(expired) == 0
|
|
assert mgr.check_status("t1") is not None
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestInterruptManagerRetryPrompt:
|
|
def test_generates_correct_prompt(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=1800)
|
|
record = mgr.register("t1", "cancel_order", {"order_id": "1042"})
|
|
prompt = mgr.generate_retry_prompt(record)
|
|
assert prompt["type"] == "interrupt_expired"
|
|
assert prompt["thread_id"] == "t1"
|
|
assert prompt["action"] == "cancel_order"
|
|
assert "30 minutes" in prompt["message"]
|
|
assert "cancel_order" in prompt["message"]
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestInterruptManagerHasPending:
|
|
def test_no_interrupt(self) -> None:
|
|
mgr = InterruptManager()
|
|
assert not mgr.has_pending("t1")
|
|
|
|
def test_has_active_interrupt(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=1800)
|
|
mgr.register("t1", "cancel_order", {})
|
|
assert mgr.has_pending("t1")
|
|
|
|
def test_expired_interrupt_not_pending(self) -> None:
|
|
mgr = InterruptManager(ttl_seconds=5)
|
|
mgr.register("t1", "cancel_order", {})
|
|
with patch("app.interrupt_manager.time") as mock_time:
|
|
mock_time.time.return_value = mgr._interrupts["t1"].created_at + 6
|
|
assert not mgr.has_pending("t1")
|