"""Tests for services/pr_poller.py. Written FIRST (TDD RED phase). Tests verify: - _synthesize_webhook_payload produces a valid payload dict - run_pr_poll_loop calls list_active_prs, dedup, then schedules graph for each unprocessed PR - Fake sleep is injected to avoid real waits """ import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from release_agent.models.pr import PRInfo from release_agent.services.pr_poller import _synthesize_webhook_payload, run_pr_poll_loop # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_pr( pr_id: str = "10", repo_name: str = "my-repo", branch: str = "refs/heads/feature/ALLPOST-100-fix", title: str = "Test PR", status: str = "active", ) -> PRInfo: return PRInfo( pr_id=pr_id, pr_url=f"https://dev.azure.com/org/proj/_git/{repo_name}/pullrequest/{pr_id}", repo_name=repo_name, branch=branch, pr_title=title, pr_status=status, ) # --------------------------------------------------------------------------- # _synthesize_webhook_payload tests # --------------------------------------------------------------------------- class TestSynthesizeWebhookPayload: def test_returns_dict(self) -> None: pr = _make_pr() result = _synthesize_webhook_payload(pr) assert isinstance(result, dict) def test_has_resource_key(self) -> None: pr = _make_pr() result = _synthesize_webhook_payload(pr) assert "resource" in result def test_resource_contains_pull_request_id(self) -> None: pr = _make_pr(pr_id="42") result = _synthesize_webhook_payload(pr) assert result["resource"]["pull_request_id"] == 42 def test_resource_contains_repository_name(self) -> None: pr = _make_pr(repo_name="backend-api") result = _synthesize_webhook_payload(pr) assert result["resource"]["repository"]["name"] == "backend-api" def test_resource_contains_title(self) -> None: pr = _make_pr(title="My PR Title") result = _synthesize_webhook_payload(pr) assert result["resource"]["title"] == "My PR Title" def test_resource_contains_source_ref_name(self) -> None: pr = _make_pr(branch="refs/heads/feature/ALLPOST-200-test") result = _synthesize_webhook_payload(pr) assert result["resource"]["source_ref_name"] == "refs/heads/feature/ALLPOST-200-test" def test_resource_status_is_active(self) -> None: pr = _make_pr(status="active") result = _synthesize_webhook_payload(pr) assert result["resource"]["status"] == "active" def test_event_type_is_pr_updated(self) -> None: pr = _make_pr() result = _synthesize_webhook_payload(pr) assert "event_type" in result def test_subscription_id_present(self) -> None: pr = _make_pr() result = _synthesize_webhook_payload(pr) assert "subscription_id" in result def test_different_prs_produce_different_payloads(self) -> None: pr1 = _make_pr(pr_id="1") pr2 = _make_pr(pr_id="2") r1 = _synthesize_webhook_payload(pr1) r2 = _synthesize_webhook_payload(pr2) assert r1["resource"]["pull_request_id"] != r2["resource"]["pull_request_id"] # --------------------------------------------------------------------------- # run_pr_poll_loop tests # --------------------------------------------------------------------------- class TestRunPrPollLoop: async def test_calls_list_active_prs_for_each_repo(self) -> None: azdo = AsyncMock() azdo.list_active_prs = AsyncMock(return_value=[]) sleep_calls: list[float] = [] async def fake_sleep(seconds: float) -> None: sleep_calls.append(seconds) raise asyncio.CancelledError with patch("release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[])): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["repo-a", "repo-b"], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=MagicMock(), sleep_fn=fake_sleep, ) assert azdo.list_active_prs.call_count == 2 async def test_calls_find_unprocessed_prs(self) -> None: pr = _make_pr(pr_id="10") azdo = AsyncMock() azdo.list_active_prs = AsyncMock(return_value=[pr]) find_mock = AsyncMock(return_value=[]) async def fake_sleep(seconds: float) -> None: raise asyncio.CancelledError with patch("release_agent.services.pr_poller.find_unprocessed_prs", new=find_mock): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["my-repo"], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=MagicMock(), sleep_fn=fake_sleep, ) find_mock.assert_called_once() async def test_schedules_graph_for_each_unprocessed_pr(self) -> None: pr1 = _make_pr(pr_id="10") pr2 = _make_pr(pr_id="20") azdo = AsyncMock() azdo.list_active_prs = AsyncMock(return_value=[pr1, pr2]) schedule_mock = MagicMock() async def fake_sleep(seconds: float) -> None: raise asyncio.CancelledError with patch( "release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[pr1, pr2]), ): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["my-repo"], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=schedule_mock, sleep_fn=fake_sleep, ) assert schedule_mock.call_count == 2 async def test_does_not_schedule_already_processed_prs(self) -> None: pr = _make_pr(pr_id="10") azdo = AsyncMock() azdo.list_active_prs = AsyncMock(return_value=[pr]) schedule_mock = MagicMock() async def fake_sleep(seconds: float) -> None: raise asyncio.CancelledError # All PRs already processed with patch( "release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[]), ): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["my-repo"], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=schedule_mock, sleep_fn=fake_sleep, ) schedule_mock.assert_not_called() async def test_sleeps_for_configured_interval(self) -> None: azdo = AsyncMock() azdo.list_active_prs = AsyncMock(return_value=[]) sleep_calls: list[float] = [] async def fake_sleep(seconds: float) -> None: sleep_calls.append(seconds) raise asyncio.CancelledError with patch("release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[])): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["my-repo"], target_branch="refs/heads/develop", interval_seconds=123, schedule_fn=MagicMock(), sleep_fn=fake_sleep, ) assert sleep_calls[0] == 123 async def test_handles_empty_watched_repos(self) -> None: azdo = AsyncMock() async def fake_sleep(seconds: float) -> None: raise asyncio.CancelledError with patch("release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[])): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=[], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=MagicMock(), sleep_fn=fake_sleep, ) azdo.list_active_prs.assert_not_called() async def test_schedule_fn_receives_synthesized_payload(self) -> None: pr = _make_pr(pr_id="55", repo_name="test-repo") azdo = AsyncMock() azdo.list_active_prs = AsyncMock(return_value=[pr]) schedule_calls: list[dict] = [] def schedule_mock(**kwargs) -> None: schedule_calls.append(kwargs) async def fake_sleep(seconds: float) -> None: raise asyncio.CancelledError with patch( "release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[pr]), ): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["test-repo"], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=schedule_mock, sleep_fn=fake_sleep, ) assert len(schedule_calls) == 1 initial_state = schedule_calls[0]["initial_state"] assert initial_state["webhook_payload"]["resource"]["pull_request_id"] == 55 assert initial_state["pr_id"] == "55" assert initial_state["repo_name"] == "test-repo" async def test_continues_after_list_active_prs_error(self) -> None: azdo = AsyncMock() # First repo raises, second succeeds azdo.list_active_prs = AsyncMock(side_effect=[Exception("API error"), []]) sleep_calls: list[float] = [] async def fake_sleep(seconds: float) -> None: sleep_calls.append(seconds) raise asyncio.CancelledError with patch("release_agent.services.pr_poller.find_unprocessed_prs", new=AsyncMock(return_value=[])): with pytest.raises(asyncio.CancelledError): await run_pr_poll_loop( azdo_client=azdo, db_pool=MagicMock(), watched_repos=["repo-a", "repo-b"], target_branch="refs/heads/develop", interval_seconds=30, schedule_fn=MagicMock(), sleep_fn=fake_sleep, ) # Should still sleep (loop iteration completed despite error) assert len(sleep_calls) == 1