LangGraph-based release automation agent with: - PR discovery (webhook + polling) - AI code review via Claude Code CLI (subscription-based) - Auto-create Jira tickets for PRs without ticket ID - Jira ticket lifecycle management (code review -> staging -> done) - CI/CD pipeline trigger, polling, and approval gates - Slack interactive messages with approval buttons - Per-repo semantic versioning - PostgreSQL persistence (threads, staging, releases) - FastAPI API (webhooks, approvals, status, manual triggers) - Docker Compose deployment 1069 tests, 95%+ coverage.
573 lines
22 KiB
Python
573 lines
22 KiB
Python
"""Tests for JiraClient. Written FIRST (TDD RED phase)."""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from release_agent.exceptions import AuthenticationError, NotFoundError, ServiceError
|
|
from release_agent.models.jira import JiraIssue, JiraTransition
|
|
from release_agent.tools.jira import JiraClient
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixture helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
FIXTURES = Path(__file__).parent / "fixtures"
|
|
|
|
|
|
def _load_json(name: str) -> dict:
|
|
return json.loads((FIXTURES / name).read_text())
|
|
|
|
|
|
def _make_transport(routes: dict[tuple[str, str], tuple[int, bytes | str]]) -> httpx.MockTransport:
|
|
"""Build a MockTransport dispatching by (method, url_substring)."""
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
url = str(request.url)
|
|
method = request.method
|
|
for (m, url_fragment), (status, body) in routes.items():
|
|
if m == method and url_fragment in url:
|
|
content = body if isinstance(body, bytes) else body.encode()
|
|
return httpx.Response(status_code=status, content=content)
|
|
return httpx.Response(status_code=404, content=b'{"errorMessages": ["Not found"]}')
|
|
|
|
return httpx.MockTransport(handler)
|
|
|
|
|
|
def _make_client(routes: dict) -> JiraClient:
|
|
transport = _make_transport(routes)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
return JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="user@example.com",
|
|
api_token="test-token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Construction tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestJiraClientConstruction:
|
|
"""Tests for JiraClient initialization."""
|
|
|
|
def test_can_be_instantiated(self) -> None:
|
|
transport = httpx.MockTransport(lambda r: httpx.Response(200, content=b"{}"))
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
assert client is not None
|
|
|
|
async def test_context_manager_closes_client(self) -> None:
|
|
transport = httpx.MockTransport(lambda r: httpx.Response(200, content=b"{}"))
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
async with JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
) as client:
|
|
assert client is not None
|
|
assert http_client.is_closed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_issue tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetIssue:
|
|
"""Tests for JiraClient.get_issue."""
|
|
|
|
async def test_returns_jira_issue(self) -> None:
|
|
issue_data = _load_json("jira_issue.json")
|
|
routes = {("GET", "ALLPOST-100"): (200, json.dumps(issue_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_issue("ALLPOST-100")
|
|
assert isinstance(result, JiraIssue)
|
|
|
|
async def test_key_extracted(self) -> None:
|
|
issue_data = _load_json("jira_issue.json")
|
|
routes = {("GET", "ALLPOST-100"): (200, json.dumps(issue_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_issue("ALLPOST-100")
|
|
assert result.key == "ALLPOST-100"
|
|
|
|
async def test_summary_extracted(self) -> None:
|
|
issue_data = _load_json("jira_issue.json")
|
|
routes = {("GET", "ALLPOST-100"): (200, json.dumps(issue_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_issue("ALLPOST-100")
|
|
assert result.summary == "Fix the authentication bug"
|
|
|
|
async def test_status_extracted(self) -> None:
|
|
issue_data = _load_json("jira_issue.json")
|
|
routes = {("GET", "ALLPOST-100"): (200, json.dumps(issue_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_issue("ALLPOST-100")
|
|
assert result.status == "In Progress"
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "ALLPOST-999"): (404, b'{"errorMessages": ["Issue not found"]}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_issue("ALLPOST-999")
|
|
|
|
async def test_401_raises_authentication_error(self) -> None:
|
|
routes = {("GET", "ALLPOST-100"): (401, b'{"errorMessages": ["Unauthorized"]}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(AuthenticationError):
|
|
await client.get_issue("ALLPOST-100")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_transitions tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetTransitions:
|
|
"""Tests for JiraClient.get_transitions."""
|
|
|
|
async def test_returns_list_of_transitions(self) -> None:
|
|
transition_data = _load_json("jira_transitions.json")
|
|
routes = {("GET", "transitions"): (200, json.dumps(transition_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_transitions("ALLPOST-100")
|
|
assert isinstance(result, list)
|
|
assert all(isinstance(t, JiraTransition) for t in result)
|
|
|
|
async def test_transition_names_extracted(self) -> None:
|
|
transition_data = _load_json("jira_transitions.json")
|
|
routes = {("GET", "transitions"): (200, json.dumps(transition_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_transitions("ALLPOST-100")
|
|
names = [t.name for t in result]
|
|
assert "Released" in names
|
|
assert "In Progress" in names
|
|
|
|
async def test_transition_ids_extracted(self) -> None:
|
|
transition_data = _load_json("jira_transitions.json")
|
|
routes = {("GET", "transitions"): (200, json.dumps(transition_data))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_transitions("ALLPOST-100")
|
|
ids = [t.id for t in result]
|
|
assert "11" in ids
|
|
|
|
async def test_empty_transitions_returned(self) -> None:
|
|
routes = {("GET", "transitions"): (200, json.dumps({"transitions": []}))}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.get_transitions("ALLPOST-100")
|
|
assert result == []
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("GET", "transitions"): (404, b'{"errorMessages": ["Not found"]}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.get_transitions("ALLPOST-999")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# transition_issue tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTransitionIssue:
|
|
"""Tests for JiraClient.transition_issue."""
|
|
|
|
async def test_returns_true_on_success(self) -> None:
|
|
transition_data = _load_json("jira_transitions.json")
|
|
routes = {
|
|
("GET", "transitions"): (200, json.dumps(transition_data)),
|
|
("POST", "transitions"): (204, b""),
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.transition_issue("ALLPOST-100", "Released")
|
|
assert result is True
|
|
|
|
async def test_returns_false_when_transition_not_found(self) -> None:
|
|
transition_data = _load_json("jira_transitions.json")
|
|
routes = {
|
|
("GET", "transitions"): (200, json.dumps(transition_data)),
|
|
("POST", "transitions"): (204, b""),
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
# "QA Review" is not in the fixture transitions
|
|
result = await client.transition_issue("ALLPOST-100", "QA Review")
|
|
assert result is False
|
|
|
|
async def test_fallback_to_dev_in_progress_then_retries(self) -> None:
|
|
"""Two-step fallback: if target unavailable, try 'Dev in Progress' first."""
|
|
get_call_count = {"n": 0}
|
|
# First GET: only "Dev in Progress" available (no "Released" yet)
|
|
transition_data_first = {
|
|
"transitions": [{"id": "21", "name": "Dev in Progress"}]
|
|
}
|
|
# Second GET (after Dev in Progress transition): "Released" now available
|
|
transition_data_second = {
|
|
"transitions": [
|
|
{"id": "21", "name": "Dev in Progress"},
|
|
{"id": "41", "name": "Released"},
|
|
]
|
|
}
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
url = str(request.url)
|
|
method = request.method
|
|
if method == "GET" and "transitions" in url:
|
|
get_call_count["n"] += 1
|
|
if get_call_count["n"] <= 1:
|
|
return httpx.Response(200, content=json.dumps(transition_data_first).encode())
|
|
return httpx.Response(200, content=json.dumps(transition_data_second).encode())
|
|
if method == "POST" and "transitions" in url:
|
|
return httpx.Response(204, content=b"")
|
|
return httpx.Response(404, content=b'{"errorMessages": ["Not found"]}')
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="user@example.com",
|
|
api_token="test-token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
result = await client.transition_issue("ALLPOST-100", "Released")
|
|
assert result is True
|
|
|
|
async def test_returns_false_when_still_unavailable_after_fallback(self) -> None:
|
|
"""Return False when target transition is unavailable even after fallback."""
|
|
get_call_count = {"n": 0}
|
|
# First GET: only "Dev in Progress" available
|
|
transition_data_first = {
|
|
"transitions": [{"id": "21", "name": "Dev in Progress"}]
|
|
}
|
|
# Second GET (after fallback): target STILL not available
|
|
transition_data_second = {
|
|
"transitions": [{"id": "21", "name": "Dev in Progress"}]
|
|
}
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
url = str(request.url)
|
|
method = request.method
|
|
if method == "GET" and "transitions" in url:
|
|
get_call_count["n"] += 1
|
|
if get_call_count["n"] <= 1:
|
|
return httpx.Response(200, content=json.dumps(transition_data_first).encode())
|
|
return httpx.Response(200, content=json.dumps(transition_data_second).encode())
|
|
if method == "POST" and "transitions" in url:
|
|
return httpx.Response(204, content=b"")
|
|
return httpx.Response(404, content=b'{"errorMessages": ["Not found"]}')
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="user@example.com",
|
|
api_token="test-token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
result = await client.transition_issue("ALLPOST-100", "Released")
|
|
assert result is False
|
|
|
|
async def test_404_on_transition_raises_not_found(self) -> None:
|
|
transition_data = _load_json("jira_transitions.json")
|
|
routes = {
|
|
("GET", "transitions"): (200, json.dumps(transition_data)),
|
|
("POST", "transitions"): (404, b'{"errorMessages": ["Not found"]}'),
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.transition_issue("ALLPOST-100", "Released")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# add_remote_link tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAddRemoteLink:
|
|
"""Tests for JiraClient.add_remote_link."""
|
|
|
|
async def test_returns_true_on_success(self) -> None:
|
|
routes = {
|
|
("POST", "remotelink"): (200, json.dumps({"id": 1, "self": "..."})),
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.add_remote_link(
|
|
ticket_id="ALLPOST-100",
|
|
url="https://dev.azure.com/org/proj/_git/repo/pullrequest/42",
|
|
title="PR #42: Fix auth",
|
|
)
|
|
assert result is True
|
|
|
|
async def test_404_raises_not_found(self) -> None:
|
|
routes = {("POST", "remotelink"): (404, b'{"errorMessages": ["Not found"]}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(NotFoundError):
|
|
await client.add_remote_link(
|
|
ticket_id="ALLPOST-999",
|
|
url="https://example.com",
|
|
title="Some PR",
|
|
)
|
|
|
|
async def test_400_raises_service_error(self) -> None:
|
|
routes = {("POST", "remotelink"): (400, b'{"errorMessages": ["Bad request"]}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(ServiceError):
|
|
await client.add_remote_link(
|
|
ticket_id="ALLPOST-100",
|
|
url="not-a-url",
|
|
title="Bad link",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifecycle tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestJiraClientLifecycle:
|
|
"""Tests for JiraClient close() method."""
|
|
|
|
async def test_close_closes_http_client(self) -> None:
|
|
transport = httpx.MockTransport(lambda r: httpx.Response(200, content=b"{}"))
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
await client.close()
|
|
|
|
assert http_client.is_closed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _text_to_adf tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTextToAdf:
|
|
"""Tests for the _text_to_adf helper."""
|
|
|
|
def test_returns_dict(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Hello world")
|
|
assert isinstance(result, dict)
|
|
|
|
def test_version_is_1(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Hello world")
|
|
assert result["version"] == 1
|
|
|
|
def test_type_is_doc(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Hello world")
|
|
assert result["type"] == "doc"
|
|
|
|
def test_content_is_list(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Hello world")
|
|
assert isinstance(result["content"], list)
|
|
|
|
def test_single_line_produces_one_paragraph(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Hello world")
|
|
assert len(result["content"]) == 1
|
|
assert result["content"][0]["type"] == "paragraph"
|
|
|
|
def test_multiline_produces_multiple_paragraphs(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Line one\n\nLine two")
|
|
# Each non-empty line becomes a paragraph
|
|
paragraphs = [c for c in result["content"] if c["type"] == "paragraph"]
|
|
assert len(paragraphs) == 2
|
|
|
|
def test_paragraph_contains_text_node(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("Hello")
|
|
paragraph = result["content"][0]
|
|
assert "content" in paragraph
|
|
text_node = paragraph["content"][0]
|
|
assert text_node["type"] == "text"
|
|
assert text_node["text"] == "Hello"
|
|
|
|
def test_empty_string_produces_empty_doc(self) -> None:
|
|
from release_agent.tools.jira import _text_to_adf
|
|
result = _text_to_adf("")
|
|
assert result["content"] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JiraClient.create_issue tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCreateIssue:
|
|
"""Tests for JiraClient.create_issue."""
|
|
|
|
async def test_returns_ticket_key(self) -> None:
|
|
response_body = json.dumps({"id": "10001", "key": "ALLPOST-42", "self": "https://..."})
|
|
routes = {("POST", "/rest/api/3/issue"): (201, response_body)}
|
|
client = _make_client(routes)
|
|
|
|
result = await client.create_issue(
|
|
project="ALLPOST",
|
|
summary="New feature",
|
|
description="Some description",
|
|
)
|
|
assert result == "ALLPOST-42"
|
|
|
|
async def test_default_issue_type_is_story(self) -> None:
|
|
captured_bodies: list[dict] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_bodies.append(json.loads(request.content))
|
|
return httpx.Response(
|
|
201,
|
|
content=json.dumps({"key": "ALLPOST-1"}).encode(),
|
|
)
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
await client.create_issue(project="ALLPOST", summary="S", description="D")
|
|
assert captured_bodies[0]["fields"]["issuetype"]["name"] == "Story"
|
|
|
|
async def test_custom_issue_type_sent(self) -> None:
|
|
captured_bodies: list[dict] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_bodies.append(json.loads(request.content))
|
|
return httpx.Response(
|
|
201,
|
|
content=json.dumps({"key": "ALLPOST-2"}).encode(),
|
|
)
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
await client.create_issue(
|
|
project="ALLPOST", summary="S", description="D", issue_type="Bug"
|
|
)
|
|
assert captured_bodies[0]["fields"]["issuetype"]["name"] == "Bug"
|
|
|
|
async def test_project_key_sent_in_body(self) -> None:
|
|
captured_bodies: list[dict] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_bodies.append(json.loads(request.content))
|
|
return httpx.Response(
|
|
201,
|
|
content=json.dumps({"key": "MYPROJ-3"}).encode(),
|
|
)
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
await client.create_issue(project="MYPROJ", summary="S", description="D")
|
|
assert captured_bodies[0]["fields"]["project"]["key"] == "MYPROJ"
|
|
|
|
async def test_summary_sent_in_body(self) -> None:
|
|
captured_bodies: list[dict] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_bodies.append(json.loads(request.content))
|
|
return httpx.Response(
|
|
201,
|
|
content=json.dumps({"key": "ALLPOST-5"}).encode(),
|
|
)
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
await client.create_issue(project="ALLPOST", summary="My Summary", description="D")
|
|
assert captured_bodies[0]["fields"]["summary"] == "My Summary"
|
|
|
|
async def test_description_is_adf_format(self) -> None:
|
|
captured_bodies: list[dict] = []
|
|
|
|
def handler(request: httpx.Request) -> httpx.Response:
|
|
captured_bodies.append(json.loads(request.content))
|
|
return httpx.Response(
|
|
201,
|
|
content=json.dumps({"key": "ALLPOST-6"}).encode(),
|
|
)
|
|
|
|
transport = httpx.MockTransport(handler)
|
|
http_client = httpx.AsyncClient(transport=transport)
|
|
client = JiraClient(
|
|
base_url="https://billolife.atlassian.net",
|
|
email="u@example.com",
|
|
api_token="token",
|
|
http_client=http_client,
|
|
)
|
|
|
|
await client.create_issue(project="ALLPOST", summary="S", description="My desc")
|
|
desc = captured_bodies[0]["fields"]["description"]
|
|
assert desc["type"] == "doc"
|
|
assert desc["version"] == 1
|
|
|
|
async def test_401_raises_authentication_error(self) -> None:
|
|
routes = {("POST", "/rest/api/3/issue"): (401, b'{"errorMessages": ["Unauthorized"]}')}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(AuthenticationError):
|
|
await client.create_issue(project="ALLPOST", summary="S", description="D")
|
|
|
|
async def test_400_raises_service_error(self) -> None:
|
|
routes = {
|
|
("POST", "/rest/api/3/issue"): (
|
|
400,
|
|
json.dumps({"errorMessages": ["Bad request"], "errors": {}}).encode(),
|
|
)
|
|
}
|
|
client = _make_client(routes)
|
|
|
|
with pytest.raises(ServiceError):
|
|
await client.create_issue(project="ALLPOST", summary="S", description="D")
|