LangGraph-based release automation agent with: - PR discovery (webhook + polling) - AI code review via Claude Code CLI (subscription-based) - Auto-create Jira tickets for PRs without ticket ID - Jira ticket lifecycle management (code review -> staging -> done) - CI/CD pipeline trigger, polling, and approval gates - Slack interactive messages with approval buttons - Per-repo semantic versioning - PostgreSQL persistence (threads, staging, releases) - FastAPI API (webhooks, approvals, status, manual triggers) - Docker Compose deployment 1069 tests, 95%+ coverage.
148 lines
5.2 KiB
Python
148 lines
5.2 KiB
Python
"""Tests for main.py Phase 5 changes.
|
|
|
|
Phase 5 - Step 4: _ensure_db_schema creates staging/archived tables,
|
|
and lifespan uses PostgresStagingStore.
|
|
Written FIRST (TDD RED phase).
|
|
"""
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _ensure_db_schema includes staging DDL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEnsureDbSchemaPhase5:
|
|
async def test_schema_creates_staging_releases_table(self) -> None:
|
|
from release_agent.main import _ensure_db_schema
|
|
|
|
executed_sqls: list[str] = []
|
|
|
|
mock_cursor = AsyncMock()
|
|
mock_cursor.__aenter__ = AsyncMock(return_value=mock_cursor)
|
|
mock_cursor.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
async def capture_execute(sql: str, *args) -> None:
|
|
executed_sqls.append(sql)
|
|
|
|
mock_cursor.execute = capture_execute
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.cursor = MagicMock(return_value=mock_cursor)
|
|
mock_conn.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.connection = MagicMock(return_value=mock_conn)
|
|
|
|
await _ensure_db_schema(mock_pool)
|
|
|
|
all_sql = " ".join(executed_sqls)
|
|
assert "staging_releases" in all_sql
|
|
|
|
async def test_schema_creates_archived_releases_table(self) -> None:
|
|
from release_agent.main import _ensure_db_schema
|
|
|
|
executed_sqls: list[str] = []
|
|
|
|
mock_cursor = AsyncMock()
|
|
mock_cursor.__aenter__ = AsyncMock(return_value=mock_cursor)
|
|
mock_cursor.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
async def capture_execute(sql: str, *args) -> None:
|
|
executed_sqls.append(sql)
|
|
|
|
mock_cursor.execute = capture_execute
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.cursor = MagicMock(return_value=mock_cursor)
|
|
mock_conn.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.connection = MagicMock(return_value=mock_conn)
|
|
|
|
await _ensure_db_schema(mock_pool)
|
|
|
|
all_sql = " ".join(executed_sqls)
|
|
assert "archived_releases" in all_sql
|
|
|
|
async def test_schema_still_creates_agent_threads_table(self) -> None:
|
|
from release_agent.main import _ensure_db_schema
|
|
|
|
executed_sqls: list[str] = []
|
|
|
|
mock_cursor = AsyncMock()
|
|
mock_cursor.__aenter__ = AsyncMock(return_value=mock_cursor)
|
|
mock_cursor.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
async def capture_execute(sql: str, *args) -> None:
|
|
executed_sqls.append(sql)
|
|
|
|
mock_cursor.execute = capture_execute
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.cursor = MagicMock(return_value=mock_cursor)
|
|
mock_conn.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.connection = MagicMock(return_value=mock_conn)
|
|
|
|
await _ensure_db_schema(mock_pool)
|
|
|
|
all_sql = " ".join(executed_sqls)
|
|
assert "agent_threads" in all_sql
|
|
|
|
async def test_schema_uses_if_not_exists(self) -> None:
|
|
from release_agent.main import _ensure_db_schema
|
|
|
|
executed_sqls: list[str] = []
|
|
|
|
mock_cursor = AsyncMock()
|
|
mock_cursor.__aenter__ = AsyncMock(return_value=mock_cursor)
|
|
mock_cursor.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
async def capture_execute(sql: str, *args) -> None:
|
|
executed_sqls.append(sql)
|
|
|
|
mock_cursor.execute = capture_execute
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.cursor = MagicMock(return_value=mock_cursor)
|
|
mock_conn.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.connection = MagicMock(return_value=mock_conn)
|
|
|
|
await _ensure_db_schema(mock_pool)
|
|
|
|
all_sql = " ".join(executed_sqls)
|
|
assert "IF NOT EXISTS" in all_sql.upper()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifespan: PostgresStagingStore wired in
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLifespanUsesPostgresStagingStore:
|
|
def test_lifespan_creates_postgres_staging_store(self) -> None:
|
|
"""When PostgresStagingStore is imported in main, it is used in lifespan."""
|
|
from release_agent.main import _create_staging_store
|
|
from release_agent.graph.postgres_staging_store import PostgresStagingStore
|
|
|
|
mock_pool = MagicMock()
|
|
result = _create_staging_store(pool=mock_pool)
|
|
assert isinstance(result, PostgresStagingStore)
|
|
|
|
def test_create_staging_store_without_pool_falls_back_to_json(self) -> None:
|
|
"""Without a pool, falls back to JsonFileStagingStore for local dev."""
|
|
from release_agent.main import _create_staging_store
|
|
from release_agent.graph.dependencies import JsonFileStagingStore
|
|
|
|
result = _create_staging_store(pool=None)
|
|
assert isinstance(result, JsonFileStagingStore)
|