LangGraph-based release automation agent with: - PR discovery (webhook + polling) - AI code review via Claude Code CLI (subscription-based) - Auto-create Jira tickets for PRs without ticket ID - Jira ticket lifecycle management (code review -> staging -> done) - CI/CD pipeline trigger, polling, and approval gates - Slack interactive messages with approval buttons - Per-repo semantic versioning - PostgreSQL persistence (threads, staging, releases) - FastAPI API (webhooks, approvals, status, manual triggers) - Docker Compose deployment 1069 tests, 95%+ coverage.
820 lines
30 KiB
Python
820 lines
30 KiB
Python
"""Tests for AzDoClient. Written FIRST (TDD RED phase)."""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from release_agent.exceptions import AuthenticationError, NotFoundError, ServiceError
|
|
from release_agent.models.build import ApprovalRecord, BuildStatus
|
|
from release_agent.models.pipeline import PipelineInfo
|
|
from release_agent.models.pr import PRInfo
|
|
from release_agent.tools.azdo import AzDoClient
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixture helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
FIXTURES = Path(__file__).parent / "fixtures"
|
|
|
|
|
|
def _load_json(name: str) -> dict:
|
|
return json.loads((FIXTURES / name).read_text())
|
|
|
|
|
|
def _load_text(name: str) -> str:
|
|
return (FIXTURES / name).read_text()
|
|
|
|
|
|
def _make_transport(routes: dict[tuple[str, str], tuple[int, bytes | str]]) -> httpx.MockTransport:
|
|
"""Build a MockTransport that dispatches based on (method, url_substring)."""
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
url = str(request.url)
|
|
method = request.method
|
|
for (m, url_fragment), (status, body) in routes.items():
|
|
if m == method and url_fragment in url:
|
|
content = body if isinstance(body, bytes) else body.encode()
|
|
return httpx.Response(status_code=status, content=content)
|
|
return httpx.Response(status_code=404, content=b'{"message": "Not found"}')
|
|
|
|
return httpx.MockTransport(handler)
|
|
|
|
|
|
def _make_client(routes: dict) -> AzDoClient:
|
|
"""Create an AzDoClient with mocked HTTP transport."""
|
|
transport = _make_transport(routes)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
return AzDoClient(
|
|
base_url="https://dev.azure.com/my-org/my-project/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/my-org/my-project/_apis",
|
|
pat="test-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AzDoClient construction tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAzDoClientConstruction:
|
|
"""Tests for AzDoClient initialization."""
|
|
|
|
def test_can_be_instantiated_with_injected_clients(self) -> None:
|
|
transport = httpx.MockTransport(lambda r: httpx.Response(200, content=b"{}"))
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
client = AzDoClient(
|
|
base_url="https://dev.azure.com/org/proj/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/org/proj/_apis",
|
|
pat="my-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
assert client is not None
|
|
|
|
async def test_context_manager_closes_clients(self) -> None:
|
|
transport = httpx.MockTransport(lambda r: httpx.Response(200, content=b"{}"))
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
async with AzDoClient(
|
|
base_url="https://dev.azure.com/org/proj/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/org/proj/_apis",
|
|
pat="my-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
) as client:
|
|
assert client is not None
|
|
# After context manager exits, clients should be closed
|
|
assert http_client.is_closed
|
|
assert vsrm_client.is_closed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_pr tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetPr:
|
|
"""Tests for AzDoClient.get_pr."""
|
|
|
|
async def test_returns_pr_info(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
routes = {("GET", "pullRequests/42"): (200, json.dumps(pr_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_pr(42)
|
|
|
|
assert isinstance(result, PRInfo)
|
|
assert result.pr_id == "42"
|
|
|
|
async def test_pr_title_extracted(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
routes = {("GET", "pullRequests/42"): (200, json.dumps(pr_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_pr(42)
|
|
assert result.pr_title == "Fix the auth bug"
|
|
|
|
async def test_pr_branch_extracted(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
routes = {("GET", "pullRequests/42"): (200, json.dumps(pr_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_pr(42)
|
|
assert "ALLPOST-999" in result.branch or "bug" in result.branch
|
|
|
|
async def test_pr_status_extracted(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
routes = {("GET", "pullRequests/42"): (200, json.dumps(pr_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_pr(42)
|
|
assert result.pr_status == "active"
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "pullRequests/999"): (404, b'{"message": "PR not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_pr(999)
|
|
|
|
async def test_401_raises_authentication_error(self) -> None:
|
|
routes = {("GET", "pullRequests/42"): (401, b'{"message": "Unauthorized"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(AuthenticationError):
|
|
await client.get_pr(42)
|
|
|
|
async def test_500_raises_service_error(self) -> None:
|
|
routes = {("GET", "pullRequests/42"): (500, b'{"message": "Internal error"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(ServiceError):
|
|
await client.get_pr(42)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_pr_diff tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetPrDiff:
|
|
"""Tests for AzDoClient.get_pr_diff."""
|
|
|
|
async def test_returns_diff_string(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
routes = {
|
|
("GET", "pullRequests/42"): (200, json.dumps(pr_data)),
|
|
("GET", "diffs"): (200, json.dumps({
|
|
"changes": [
|
|
{
|
|
"item": {"path": "/src/auth.py"},
|
|
"changeType": "edit",
|
|
}
|
|
]
|
|
})),
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_pr_diff(42)
|
|
assert isinstance(result, str)
|
|
|
|
async def test_diff_includes_file_paths(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
diff_data = {
|
|
"changes": [
|
|
{"item": {"path": "/src/auth.py"}, "changeType": "edit"},
|
|
{"item": {"path": "/src/util.py"}, "changeType": "add"},
|
|
]
|
|
}
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
url = str(request.url)
|
|
if "diffs" in url:
|
|
return httpx.Response(200, content=json.dumps(diff_data).encode())
|
|
if "pullRequests/42" in url:
|
|
return httpx.Response(200, content=json.dumps(pr_data).encode())
|
|
return httpx.Response(404, content=b"{}")
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
client = AzDoClient(
|
|
base_url="https://dev.azure.com/my-org/my-project/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/my-org/my-project/_apis",
|
|
pat="test-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
|
|
result = await client.get_pr_diff(42)
|
|
assert "/src/auth.py" in result
|
|
assert "/src/util.py" in result
|
|
|
|
async def test_empty_changes_returns_empty_string(self) -> None:
|
|
pr_data = _load_json("azdo_pr.json")
|
|
diff_data: dict = {"changes": []}
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
url = str(request.url)
|
|
if "diffs" in url:
|
|
return httpx.Response(200, content=json.dumps(diff_data).encode())
|
|
if "pullRequests/42" in url:
|
|
return httpx.Response(200, content=json.dumps(pr_data).encode())
|
|
return httpx.Response(404, content=b"{}")
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
client = AzDoClient(
|
|
base_url="https://dev.azure.com/my-org/my-project/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/my-org/my-project/_apis",
|
|
pat="test-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
|
|
result = await client.get_pr_diff(42)
|
|
assert result == ""
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "pullRequests/999"): (404, b'{"message": "PR not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_pr_diff(999)
|
|
|
|
async def test_pr_without_url_field_uses_remote_url(self) -> None:
|
|
"""When the API response omits the 'url' field, fallback URL is built."""
|
|
pr_data = {
|
|
"pullRequestId": 42,
|
|
"title": "Fix bug",
|
|
"status": "active",
|
|
"sourceRefName": "refs/heads/fix/ALLPOST-1_fix",
|
|
"repository": {
|
|
"id": "repo-uuid",
|
|
"name": "my-repo",
|
|
"remoteUrl": "https://dev.azure.com/org/proj/_git/my-repo",
|
|
},
|
|
# NOTE: 'url' field is intentionally omitted
|
|
}
|
|
routes = {
|
|
("GET", "pullRequests/42"): (200, json.dumps(pr_data)),
|
|
("GET", "diffs"): (200, json.dumps({"changes": []})),
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_pr(42)
|
|
assert "42" in str(result.pr_url)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# merge_pr tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMergePr:
|
|
"""Tests for AzDoClient.merge_pr."""
|
|
|
|
async def test_returns_true_on_success(self) -> None:
|
|
merge_data = _load_json("azdo_merge_pr.json")
|
|
routes = {("PATCH", "pullRequests/42"): (200, json.dumps(merge_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.merge_pr(pr_id=42, last_merge_source_commit="abc123def456")
|
|
assert result is True
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("PATCH", "pullRequests/999"): (404, b'{"message": "Not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.merge_pr(pr_id=999, last_merge_source_commit="abc123")
|
|
|
|
async def test_409_raises_service_error(self) -> None:
|
|
routes = {("PATCH", "pullRequests/42"): (409, b'{"message": "Conflict"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(ServiceError):
|
|
await client.merge_pr(pr_id=42, last_merge_source_commit="abc123")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_pr tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCreatePr:
|
|
"""Tests for AzDoClient.create_pr."""
|
|
|
|
async def test_returns_dict_with_pr_id(self) -> None:
|
|
create_data = _load_json("azdo_create_pr.json")
|
|
routes = {("POST", "pullRequests"): (201, json.dumps(create_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.create_pr(
|
|
repo="my-repo",
|
|
source="refs/heads/release/v1.2.0",
|
|
target="refs/heads/main",
|
|
title="Release v1.2.0",
|
|
description="Release notes",
|
|
)
|
|
assert isinstance(result, dict)
|
|
assert result["pullRequestId"] == 99
|
|
|
|
async def test_400_raises_service_error(self) -> None:
|
|
routes = {("POST", "pullRequests"): (400, b'{"message": "Bad request"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(ServiceError):
|
|
await client.create_pr(
|
|
repo="my-repo",
|
|
source="refs/heads/release/v1.2.0",
|
|
target="refs/heads/main",
|
|
title="Release",
|
|
description="",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_build_pipelines tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestListBuildPipelines:
|
|
"""Tests for AzDoClient.list_build_pipelines."""
|
|
|
|
async def test_returns_list_of_pipeline_info(self) -> None:
|
|
pipeline_data = _load_json("azdo_pipelines.json")
|
|
routes = {("GET", "pipelines"): (200, json.dumps(pipeline_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_build_pipelines(repo="my-repo")
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert all(isinstance(p, PipelineInfo) for p in result)
|
|
|
|
async def test_pipeline_ids_extracted(self) -> None:
|
|
pipeline_data = _load_json("azdo_pipelines.json")
|
|
routes = {("GET", "pipelines"): (200, json.dumps(pipeline_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_build_pipelines(repo="my-repo")
|
|
ids = [p.id for p in result]
|
|
assert 10 in ids
|
|
assert 20 in ids
|
|
|
|
async def test_empty_list_on_no_pipelines(self) -> None:
|
|
routes = {("GET", "pipelines"): (200, json.dumps({"value": [], "count": 0}))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_build_pipelines(repo="my-repo")
|
|
assert result == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# trigger_pipeline tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTriggerPipeline:
|
|
"""Tests for AzDoClient.trigger_pipeline."""
|
|
|
|
async def test_returns_dict_with_build_id(self) -> None:
|
|
trigger_data = _load_json("azdo_trigger_pipeline.json")
|
|
routes = {("POST", "pipelines/10/runs"): (200, json.dumps(trigger_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.trigger_pipeline(pipeline_id=10, branch="refs/heads/main")
|
|
assert isinstance(result, dict)
|
|
assert result["id"] == 1001
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("POST", "pipelines/999/runs"): (404, b'{"message": "Pipeline not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.trigger_pipeline(pipeline_id=999, branch="refs/heads/main")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_build_status tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetBuildStatus:
|
|
"""Tests for AzDoClient.get_build_status."""
|
|
|
|
async def test_returns_build_status_object(self) -> None:
|
|
build_data = _load_json("azdo_build_status.json")
|
|
routes = {("GET", "build/builds/1001"): (200, json.dumps(build_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_build_status(build_id=1001)
|
|
assert isinstance(result, BuildStatus)
|
|
|
|
async def test_status_field_populated(self) -> None:
|
|
build_data = _load_json("azdo_build_status.json")
|
|
routes = {("GET", "build/builds/1001"): (200, json.dumps(build_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_build_status(build_id=1001)
|
|
assert result.status == "completed"
|
|
|
|
async def test_result_field_populated(self) -> None:
|
|
build_data = _load_json("azdo_build_status.json")
|
|
routes = {("GET", "build/builds/1001"): (200, json.dumps(build_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_build_status(build_id=1001)
|
|
assert result.result == "succeeded"
|
|
|
|
async def test_build_url_present(self) -> None:
|
|
build_data = _load_json("azdo_build_status.json")
|
|
routes = {("GET", "build/builds/1001"): (200, json.dumps(build_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_build_status(build_id=1001)
|
|
# build_url may be None if not in fixture, but field must exist
|
|
assert hasattr(result, "build_url")
|
|
|
|
async def test_result_none_when_not_completed(self) -> None:
|
|
build_data = {"id": 99, "status": "inProgress", "buildNumber": "20240101.1"}
|
|
routes = {("GET", "build/builds/99"): (200, json.dumps(build_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_build_status(build_id=99)
|
|
assert result.status == "inProgress"
|
|
assert result.result is None
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "build/builds/9999"): (404, b'{"message": "Build not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_build_status(build_id=9999)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_release_approvals tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetReleaseApprovals:
|
|
"""Tests for AzDoClient.get_release_approvals."""
|
|
|
|
async def test_returns_list_of_approval_records(self) -> None:
|
|
approvals_data = {
|
|
"value": [
|
|
{
|
|
"id": 101,
|
|
"status": "pending",
|
|
"releaseEnvironment": {"name": "Sandbox", "release": {"id": 55}},
|
|
},
|
|
{
|
|
"id": 102,
|
|
"status": "approved",
|
|
"releaseEnvironment": {"name": "Production", "release": {"id": 55}},
|
|
},
|
|
],
|
|
"count": 2,
|
|
}
|
|
routes = {("GET", "release/approvals"): (200, json.dumps(approvals_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_release_approvals(release_id=55)
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert all(isinstance(a, ApprovalRecord) for a in result)
|
|
|
|
async def test_approval_id_populated(self) -> None:
|
|
approvals_data = {
|
|
"value": [
|
|
{
|
|
"id": 201,
|
|
"status": "pending",
|
|
"releaseEnvironment": {"name": "Sandbox", "release": {"id": 10}},
|
|
}
|
|
],
|
|
"count": 1,
|
|
}
|
|
routes = {("GET", "release/approvals"): (200, json.dumps(approvals_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_release_approvals(release_id=10)
|
|
assert result[0].approval_id == "201"
|
|
|
|
async def test_stage_name_populated(self) -> None:
|
|
approvals_data = {
|
|
"value": [
|
|
{
|
|
"id": 300,
|
|
"status": "pending",
|
|
"releaseEnvironment": {"name": "Production", "release": {"id": 20}},
|
|
}
|
|
],
|
|
"count": 1,
|
|
}
|
|
routes = {("GET", "release/approvals"): (200, json.dumps(approvals_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_release_approvals(release_id=20)
|
|
assert result[0].stage_name == "Production"
|
|
|
|
async def test_release_id_populated(self) -> None:
|
|
approvals_data = {
|
|
"value": [
|
|
{
|
|
"id": 400,
|
|
"status": "pending",
|
|
"releaseEnvironment": {"name": "Stage", "release": {"id": 99}},
|
|
}
|
|
],
|
|
"count": 1,
|
|
}
|
|
routes = {("GET", "release/approvals"): (200, json.dumps(approvals_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_release_approvals(release_id=99)
|
|
assert result[0].release_id == 99
|
|
|
|
async def test_empty_list_when_no_approvals(self) -> None:
|
|
approvals_data = {"value": [], "count": 0}
|
|
routes = {("GET", "release/approvals"): (200, json.dumps(approvals_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_release_approvals(release_id=77)
|
|
assert result == []
|
|
|
|
async def test_filters_by_release_id_in_query(self) -> None:
|
|
captured_urls: list[str] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_urls.append(str(request.url))
|
|
return httpx.Response(200, content=b'{"value": [], "count": 0}')
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
client = AzDoClient(
|
|
base_url="https://dev.azure.com/my-org/my-project/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/my-org/my-project/_apis",
|
|
pat="test-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
|
|
await client.get_release_approvals(release_id=42)
|
|
assert any("approvals" in url for url in captured_urls)
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "release/approvals"): (404, b'{"message": "Not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_release_approvals(release_id=999)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_latest_release tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetLatestRelease:
|
|
"""Tests for AzDoClient.get_latest_release."""
|
|
|
|
async def test_returns_dict(self) -> None:
|
|
release_data = {
|
|
"value": [{"id": 55, "name": "Release-55", "status": "active"}],
|
|
"count": 1,
|
|
}
|
|
routes = {("GET", "release/releases"): (200, json.dumps(release_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_latest_release(definition_id=7)
|
|
assert isinstance(result, dict)
|
|
assert result["id"] == 55
|
|
|
|
async def test_returns_empty_dict_when_no_releases(self) -> None:
|
|
release_data = {"value": [], "count": 0}
|
|
routes = {("GET", "release/releases"): (200, json.dumps(release_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_latest_release(definition_id=99)
|
|
assert result == {}
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "release/releases"): (404, b'{"message": "Not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_latest_release(definition_id=999)
|
|
|
|
async def test_passes_definition_id_as_filter(self) -> None:
|
|
captured_urls: list[str] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_urls.append(str(request.url))
|
|
return httpx.Response(200, content=b'{"value": [], "count": 0}')
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
client = AzDoClient(
|
|
base_url="https://dev.azure.com/my-org/my-project/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/my-org/my-project/_apis",
|
|
pat="test-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
|
|
await client.get_latest_release(definition_id=13)
|
|
assert any("releases" in url for url in captured_urls)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# approve_release tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApproveRelease:
|
|
"""Tests for AzDoClient.approve_release."""
|
|
|
|
async def test_returns_dict_with_status(self) -> None:
|
|
approve_data = _load_json("azdo_approve_release.json")
|
|
routes = {("PATCH", "release/approvals"): (200, json.dumps(approve_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.approve_release(
|
|
approval_id="approval-uuid-123", comment="Approved"
|
|
)
|
|
assert isinstance(result, dict)
|
|
assert result["status"] == "approved"
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("PATCH", "release/approvals"): (404, b'{"message": "Approval not found"}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.approve_release(approval_id="bad-id", comment="Approve")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# close() lifecycle tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAzDoClientLifecycle:
|
|
"""Tests for AzDoClient close() method."""
|
|
|
|
async def test_close_closes_both_clients(self) -> None:
|
|
transport = httpx.MockTransport(lambda r: httpx.Response(200, content=b"{}"))
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
vsrm_client = httpx.AsyncClient(transport=transport)
|
|
client = AzDoClient(
|
|
base_url="https://dev.azure.com/org/proj/_apis",
|
|
vsrm_base_url="https://vsrm.dev.azure.com/org/proj/_apis",
|
|
pat="my-pat",
|
|
http_client=http_client,
|
|
vsrm_http_client=vsrm_client,
|
|
)
|
|
|
|
await client.close()
|
|
|
|
assert http_client.is_closed
|
|
assert vsrm_client.is_closed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_active_prs tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_pr_list_response(prs: list[dict]) -> str:
|
|
return json.dumps({"value": prs, "count": len(prs)})
|
|
|
|
|
|
def _make_active_pr_item(
|
|
pr_id: int = 10,
|
|
title: str = "Test PR",
|
|
branch: str = "refs/heads/feature/ALLPOST-100_fix",
|
|
status: str = "active",
|
|
repo_name: str = "my-repo",
|
|
) -> dict:
|
|
return {
|
|
"pullRequestId": pr_id,
|
|
"title": title,
|
|
"status": status,
|
|
"sourceRefName": branch,
|
|
"targetRefName": "refs/heads/develop",
|
|
"url": f"https://dev.azure.com/org/proj/_apis/git/repositories/{repo_name}/pullRequests/{pr_id}",
|
|
"repository": {
|
|
"id": "repo-uuid",
|
|
"name": repo_name,
|
|
"remoteUrl": f"https://dev.azure.com/org/proj/_git/{repo_name}",
|
|
},
|
|
}
|
|
|
|
|
|
class TestListActivePrs:
|
|
"""Tests for AzDoClient.list_active_prs."""
|
|
|
|
async def test_returns_list_of_pr_info(self) -> None:
|
|
pr_item = _make_active_pr_item()
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
200,
|
|
_make_pr_list_response([pr_item]),
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_active_prs("my-repo", "refs/heads/develop")
|
|
|
|
assert isinstance(result, list)
|
|
assert len(result) == 1
|
|
assert isinstance(result[0], PRInfo)
|
|
|
|
async def test_pr_id_extracted(self) -> None:
|
|
pr_item = _make_active_pr_item(pr_id=55)
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
200,
|
|
_make_pr_list_response([pr_item]),
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_active_prs("my-repo", "refs/heads/develop")
|
|
assert result[0].pr_id == "55"
|
|
|
|
async def test_pr_title_extracted(self) -> None:
|
|
pr_item = _make_active_pr_item(title="My Feature")
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
200,
|
|
_make_pr_list_response([pr_item]),
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_active_prs("my-repo", "refs/heads/develop")
|
|
assert result[0].pr_title == "My Feature"
|
|
|
|
async def test_empty_list_when_no_prs(self) -> None:
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
200,
|
|
_make_pr_list_response([]),
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_active_prs("my-repo", "refs/heads/develop")
|
|
assert result == []
|
|
|
|
async def test_multiple_prs_returned(self) -> None:
|
|
prs = [
|
|
_make_active_pr_item(pr_id=10, title="PR 10"),
|
|
_make_active_pr_item(pr_id=20, title="PR 20"),
|
|
]
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
200,
|
|
_make_pr_list_response(prs),
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.list_active_prs("my-repo", "refs/heads/develop")
|
|
assert len(result) == 2
|
|
assert {r.pr_id for r in result} == {"10", "20"}
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {
|
|
("GET", "git/repositories/missing-repo/pullRequests"): (
|
|
404,
|
|
b'{"message": "Repo not found"}',
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.list_active_prs("missing-repo", "refs/heads/develop")
|
|
|
|
async def test_401_raises_authentication_error(self) -> None:
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
401,
|
|
b'{"message": "Unauthorized"}',
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(AuthenticationError):
|
|
await client.list_active_prs("my-repo", "refs/heads/develop")
|
|
|
|
async def test_500_raises_service_error(self) -> None:
|
|
routes = {
|
|
("GET", "git/repositories/my-repo/pullRequests"): (
|
|
500,
|
|
b'{"message": "Internal error"}',
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(ServiceError):
|
|
await client.list_active_prs("my-repo", "refs/heads/develop")
|