From b67cbcfd937e2ba8537750c69538f8aeca8559d5 Mon Sep 17 00:00:00 2001 From: Yaojia Wang Date: Tue, 24 Mar 2026 23:05:04 +0100 Subject: [PATCH] fix: runtime fixes for WSL deployment and integration testing - Fix RunnableConfig type annotations (dict -> RunnableConfig) for LangGraph compat - Fix AzDo PR URL parsing (_links.web.href fallback + remoteUrl construction) - Fix AzDo diff endpoint (use iterations/changes instead of non-existent diffs API) - Fix _format_diff to read changeEntries field (not changes) - Fix URL encoding for project names with spaces (Billo App Platform) - Fix subprocess.run for Windows (replace asyncio.create_subprocess_exec with thread pool) - Fix SlackClient to handle empty webhook URL gracefully - Fix notify_request_changes to catch all exceptions (not just ReleaseAgentError) - Fix JSON parsing to strip whitespace before json.loads - Add CLAUDE_CMD config field for cross-platform CLI path - Add run.py for Windows SelectorEventLoop workaround - Add db port mapping in docker-compose for local dev - Add comprehensive README sections: WSL setup, known issues, TODO list --- README.md | 99 ++++++++++++++++++++++++ docker-compose.yml | 2 + run.py | 22 ++++++ src/release_agent/api/status.py | 39 +++++++++- src/release_agent/config.py | 1 + src/release_agent/graph/ci_nodes.py | 12 +-- src/release_agent/graph/pr_completed.py | 34 ++++---- src/release_agent/graph/release.py | 52 ++++++------- src/release_agent/main.py | 2 +- src/release_agent/services/pr_poller.py | 2 +- src/release_agent/tools/azdo.py | 44 +++++++---- src/release_agent/tools/claude_review.py | 49 ++++++------ src/release_agent/tools/slack.py | 2 + 13 files changed, 272 insertions(+), 88 deletions(-) create mode 100644 run.py diff --git a/README.md b/README.md index a77bb7b..758ee53 100644 --- a/README.md +++ b/README.md @@ -330,6 +330,62 @@ docker compose up -d The agent service includes a health check at `/status`. PostgreSQL uses `pg_isready` with `service_healthy` dependency. +## Running Locally (WSL Recommended) + +The app runs best on **WSL (Ubuntu)** because: +- `psycopg` async requires `SelectorEventLoop` (incompatible with Windows `ProactorEventLoop`) +- `subprocess.run` captures Claude CLI stdout correctly on Linux but not reliably on Windows +- PostgreSQL runs in Docker (accessible from both Windows and WSL via `localhost`) + +### Setup + +```bash +# 1. Start PostgreSQL (from Windows or WSL) +cd /mnt/c/Users/yaoji/git/Billo/billo-release-agent +docker compose up -d db + +# 2. Install uv in WSL (if needed) +curl -LsSf https://astral.sh/uv/install.sh | sh +export PATH="$HOME/.local/bin:$PATH" + +# 3. Install dependencies +uv sync --all-extras + +# 4. Configure .env +# Key settings: +# CLAUDE_CMD=claude (not claude.cmd — WSL finds it via PATH) +# REPOS_BASE_DIR=/mnt/c/Users/yaoji/git/Billo +# PR_POLL_ENABLED=False (disable during dev to avoid noise) +# SLACK_WEBHOOK_URL= (leave empty during dev) + +# 5. Start the server +uv run uvicorn release_agent.main:app --host 0.0.0.0 --port 8080 + +# 6. Test +curl http://localhost:8080/status +curl -X POST http://localhost:8080/manual/pr/10443 +``` + +### Windows-only (Fallback) + +If you must run on Windows directly, use the provided `run.py` script which sets +`WindowsSelectorEventLoopPolicy` before starting uvicorn: + +```bash +uv run python run.py +``` + +Note: Claude CLI subprocess may return empty stdout on Windows due to event loop +incompatibility. WSL is the recommended approach. + +### Performance Note: WSL + /mnt/c + +Claude Code CLI with `--allowedTools Read,Glob,Grep` on `/mnt/c` (Windows filesystem +mounted in WSL) is very slow. For faster code reviews, either: + +1. **Clone repos to WSL native filesystem** (`~/git/Billo/`) and set `REPOS_BASE_DIR=~/git/Billo` +2. **Remove `--allowedTools`** so Claude only reviews the diff text (faster but less thorough) + ## Slack App Setup To use interactive buttons (optional — REST API approvals still work without it): @@ -339,3 +395,46 @@ To use interactive buttons (optional — REST API approvals still work without i 3. Add Bot Token Scopes: `chat:write`, `chat:update` 4. Install to workspace, get Bot Token (`xoxb-...`) 5. Set `SLACK_BOT_TOKEN`, `SLACK_SIGNING_SECRET`, `SLACK_CHANNEL_ID` in `.env` + +## Current Status + +### Working + +- App startup, health check, API endpoints +- Azure DevOps API integration (get PR, list active PRs, get iterations/changes) +- PR info parsing (repo_name, ticket_id, branch extraction) +- Graph execution (full pr_completed flow: parse -> fetch -> route -> review -> notify) +- Database read/write (agent_threads table) +- Slack error handling (empty webhook URL gracefully skipped) +- Claude CLI ticket generation (tested: returns structured JSON) +- Claude CLI code review (tested: returns structured JSON with verdict + issues) +- PR review comments posted to Azure DevOps (inline + summary) +- Node type annotations fixed (`RunnableConfig` instead of `dict`) + +### Known Issues + +| Issue | Severity | Workaround | +|-------|----------|------------| +| Windows: Claude CLI subprocess returns empty stdout | HIGH | Run in WSL | +| WSL + /mnt/c: Claude CLI Read/Glob very slow (10+ min) | MEDIUM | Clone repos to WSL native fs | +| Graph has no LangGraph checkpointer (interrupt not persistent) | MEDIUM | Graphs run to completion or fail; no resume | +| `_upsert_thread` only writes final state (no intermediate updates) | LOW | Query DB only after graph completes | +| CI poll may run indefinitely (no build to poll in dev) | LOW | Leave `PR_POLL_ENABLED=False` | +| Config test failures (env var leakage from .env) | LOW | Run with `-k "not test_config"` | + +### TODO (Not Yet Implemented) + +- [ ] Wire LangGraph checkpointer (PostgreSQL) for interrupt persistence +- [ ] Interrupt decision validation (currently any resume value proceeds) +- [ ] Slack interactive buttons end-to-end (Slack App not yet created) +- [ ] CI/CD pipeline trigger end-to-end testing +- [ ] Release approval gate detection (check_release_approvals is a stub) +- [ ] `last_merge_source_commit` from AzDo API for safe merge +- [ ] Operator token auth testing in production +- [ ] Multi-stage Dockerfile for smaller images +- [ ] Centralize `_upsert_thread` into shared `api/db.py` module +- [ ] Remove dead `has_ticket` routing function +- [ ] PR poller dedup query correctness (unnest pair-wise matching untested against real DB) +- [ ] `archive_release` date injection (replace `date.today()` with config) +- [ ] Approval loop max iteration guard (prevent infinite loops) +- [ ] Migrate existing release JSON data to PostgreSQL diff --git a/docker-compose.yml b/docker-compose.yml index f217f52..c419749 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,8 @@ services: POSTGRES_USER: agent POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set} POSTGRES_DB: agent + ports: + - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data healthcheck: diff --git a/run.py b/run.py new file mode 100644 index 0000000..e9c0b04 --- /dev/null +++ b/run.py @@ -0,0 +1,22 @@ +"""Windows-compatible startup script. + +psycopg async requires SelectorEventLoop, not the default ProactorEventLoop on Windows. +This script sets the correct event loop policy before starting uvicorn. +""" + +import asyncio +import sys + +if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + +import uvicorn + +if __name__ == "__main__": + uvicorn.run( + "release_agent.main:app", + host="0.0.0.0", + port=8080, + reload=False, + loop="none", # don't let uvicorn override the event loop + ) diff --git a/src/release_agent/api/status.py b/src/release_agent/api/status.py index 7cf7958..969cd46 100644 --- a/src/release_agent/api/status.py +++ b/src/release_agent/api/status.py @@ -92,11 +92,43 @@ async def manual_pr_trigger( db_pool=Depends(get_db_pool), _auth: None = Depends(require_operator_token), ) -> ManualTriggerResponse: - """Manually trigger PR processing for the given PR ID.""" + """Manually trigger PR processing for the given PR ID. + + Fetches PR details from AzDo first to build a proper initial state + including the synthesized webhook payload. + """ from release_agent.api.webhooks import _run_graph + settings = request.app.state.settings thread_id = str(uuid.uuid4()) - initial_state = {"pr_id": pr_id} + + # Fetch PR info to build webhook-compatible initial state + try: + pr_info = await tool_clients.azdo.get_pr(int(pr_id)) + initial_state = { + "webhook_payload": { + "event_type": "git.pullrequest.updated", + "subscription_id": f"manual-{pr_id}", + "resource": { + "pull_request_id": int(pr_id), + "title": pr_info.pr_title, + "status": pr_info.pr_status, + "source_ref_name": pr_info.branch, + "target_ref_name": "refs/heads/develop", + "closed_date": None, + "repository": { + "id": f"{pr_info.repo_name}-id", + "name": pr_info.repo_name, + "web_url": f"https://dev.azure.com/billodev/Billo%20App%20Platform/_git/{pr_info.repo_name}", + }, + }, + }, + "pr_id": pr_id, + "repo_name": pr_info.repo_name, + } + except Exception: + # Fallback: minimal state, let parse_webhook handle errors + initial_state = {"pr_id": pr_id} task = asyncio.create_task( _run_graph( @@ -105,6 +137,9 @@ async def manual_pr_trigger( thread_id=thread_id, tool_clients=tool_clients, db_pool=db_pool, + repos_base_dir=settings.repos_base_dir, + graph_name="pr_completed", + default_jira_project=settings.default_jira_project, ) ) request.app.state.background_tasks.add(task) diff --git a/src/release_agent/config.py b/src/release_agent/config.py index 356e9f6..ec18398 100644 --- a/src/release_agent/config.py +++ b/src/release_agent/config.py @@ -63,6 +63,7 @@ class Settings(BaseSettings): # Claude settings claude_review_model: str = "claude-sonnet-4-20250514" + claude_cmd: str = "claude" # Path to Claude Code CLI (e.g., claude.cmd on Windows) # Local repo settings repos_base_dir: str = "" # Base directory containing Billo repos (e.g., /c/Users/yaoji/git/Billo) diff --git a/src/release_agent/graph/ci_nodes.py b/src/release_agent/graph/ci_nodes.py index cc81d12..fb26ed2 100644 --- a/src/release_agent/graph/ci_nodes.py +++ b/src/release_agent/graph/ci_nodes.py @@ -8,6 +8,8 @@ External clients are accessed via config["configurable"]["clients"]. import logging from typing import Any +from langgraph.types import RunnableConfig + from release_agent.exceptions import ReleaseAgentError from release_agent.graph.dependencies import ToolClients from release_agent.graph.polling import poll_until @@ -21,11 +23,11 @@ _CI_POLL_INTERVAL = 30 _CI_POLL_MAX_WAIT = 1800 -def _get_clients(config: dict) -> ToolClients: +def _get_clients(config: RunnableConfig) -> ToolClients: return config["configurable"]["clients"] -def _get_settings(config: dict): +def _get_settings(config: RunnableConfig): return config["configurable"].get("settings") @@ -33,7 +35,7 @@ def _get_settings(config: dict): # Node: trigger_ci_build # --------------------------------------------------------------------------- -async def trigger_ci_build(state: dict[str, Any], config: dict) -> dict: +async def trigger_ci_build(state: dict[str, Any], config: RunnableConfig) -> dict: """Trigger the CI build pipeline for the repository. Finds the first available pipeline for the repo and triggers it on @@ -85,7 +87,7 @@ async def trigger_ci_build(state: dict[str, Any], config: dict) -> dict: # Node: poll_ci_build # --------------------------------------------------------------------------- -async def poll_ci_build(state: dict[str, Any], config: dict) -> dict: +async def poll_ci_build(state: dict[str, Any], config: RunnableConfig) -> dict: """Poll the CI build until completion or timeout. Uses the polling utility with configurable interval and max wait. @@ -144,7 +146,7 @@ async def poll_ci_build(state: dict[str, Any], config: dict) -> dict: # Node: notify_ci_result # --------------------------------------------------------------------------- -async def notify_ci_result(state: dict[str, Any], config: dict) -> dict: +async def notify_ci_result(state: dict[str, Any], config: RunnableConfig) -> dict: """Send a Slack notification with the CI build result. Non-critical: errors are appended rather than re-raised. diff --git a/src/release_agent/graph/pr_completed.py b/src/release_agent/graph/pr_completed.py index af2e88a..0d82646 100644 --- a/src/release_agent/graph/pr_completed.py +++ b/src/release_agent/graph/pr_completed.py @@ -9,7 +9,7 @@ from datetime import date from typing import Any from langgraph.graph import END, START, StateGraph -from langgraph.types import interrupt +from langgraph.types import RunnableConfig, interrupt from release_agent.branch_parser import parse_branch from release_agent.exceptions import ReleaseAgentError @@ -28,11 +28,11 @@ from release_agent.versioning import calculate_next_version # Helpers # --------------------------------------------------------------------------- -def _get_clients(config: dict) -> ToolClients: +def _get_clients(config: RunnableConfig) -> ToolClients: return config["configurable"]["clients"] -def _get_staging_store(config: dict): +def _get_staging_store(config: RunnableConfig): return config["configurable"].get("staging_store") @@ -40,7 +40,7 @@ def _get_staging_store(config: dict): # Node: parse_webhook # --------------------------------------------------------------------------- -async def parse_webhook(state: dict[str, Any], config: dict) -> dict: +async def parse_webhook(state: dict[str, Any], config: RunnableConfig) -> dict: """Parse the webhook_payload field and extract PR info and ticket ID. Returns a dict with pr_info, ticket_id, has_ticket, repo_name, pr_id. @@ -80,7 +80,7 @@ async def parse_webhook(state: dict[str, Any], config: dict) -> dict: # Node: fetch_pr_details # --------------------------------------------------------------------------- -async def fetch_pr_details(state: dict[str, Any], config: dict) -> dict: +async def fetch_pr_details(state: dict[str, Any], config: RunnableConfig) -> dict: """Fetch full PR details from AzDo and check if already merged. Sets pr_already_merged, pr_diff, and last_merge_source_commit. @@ -110,7 +110,7 @@ async def fetch_pr_details(state: dict[str, Any], config: dict) -> dict: # Node: auto_create_ticket # --------------------------------------------------------------------------- -async def auto_create_ticket(state: dict[str, Any], config: dict) -> dict: +async def auto_create_ticket(state: dict[str, Any], config: RunnableConfig) -> dict: """Auto-create a Jira ticket for a PR that has no existing ticket. Uses ClaudeReviewer to generate ticket content from the PR diff, @@ -163,7 +163,7 @@ async def auto_create_ticket(state: dict[str, Any], config: dict) -> dict: # Node: move_jira_code_review # --------------------------------------------------------------------------- -async def move_jira_code_review(state: dict[str, Any], config: dict) -> dict: +async def move_jira_code_review(state: dict[str, Any], config: RunnableConfig) -> dict: """Transition the Jira ticket to Code Review status. Skipped if has_ticket is False. Non-critical: errors are appended. @@ -240,7 +240,7 @@ async def _post_review_to_pr( # Node: run_code_review # --------------------------------------------------------------------------- -async def run_code_review(state: dict[str, Any], config: dict) -> dict: +async def run_code_review(state: dict[str, Any], config: RunnableConfig) -> dict: """Run Claude code review on the PR diff. Returns review_result as a serialisable dict. @@ -278,7 +278,7 @@ async def run_code_review(state: dict[str, Any], config: dict) -> dict: # Node: evaluate_review # --------------------------------------------------------------------------- -async def evaluate_review(state: dict[str, Any], config: dict) -> dict: +async def evaluate_review(state: dict[str, Any], config: RunnableConfig) -> dict: """Evaluate the review result and set review_approved. Approved only when verdict=="approve" and no blockers present. @@ -298,7 +298,7 @@ async def evaluate_review(state: dict[str, Any], config: dict) -> dict: # Node: interrupt_confirm_merge # --------------------------------------------------------------------------- -async def interrupt_confirm_merge(state: dict[str, Any], config: dict) -> dict: +async def interrupt_confirm_merge(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to confirm the PR merge. Passes a human-readable summary string to interrupt(). @@ -319,7 +319,7 @@ async def interrupt_confirm_merge(state: dict[str, Any], config: dict) -> dict: # Node: merge_pr_node # --------------------------------------------------------------------------- -async def merge_pr_node(state: dict[str, Any], config: dict) -> dict: +async def merge_pr_node(state: dict[str, Any], config: RunnableConfig) -> dict: """Merge the PR via AzDo API. Critical node — re-raises ReleaseAgentError. @@ -336,7 +336,7 @@ async def merge_pr_node(state: dict[str, Any], config: dict) -> dict: # Node: move_jira_ready_for_stage # --------------------------------------------------------------------------- -async def move_jira_ready_for_stage(state: dict[str, Any], config: dict) -> dict: +async def move_jira_ready_for_stage(state: dict[str, Any], config: RunnableConfig) -> dict: """Transition the Jira ticket to Ready for stage (2). Skipped if has_ticket is False. Non-critical: errors appended. @@ -356,7 +356,7 @@ async def move_jira_ready_for_stage(state: dict[str, Any], config: dict) -> dict # Node: add_jira_pr_link # --------------------------------------------------------------------------- -async def add_jira_pr_link(state: dict[str, Any], config: dict) -> dict: +async def add_jira_pr_link(state: dict[str, Any], config: RunnableConfig) -> dict: """Add the PR URL as a remote link on the Jira ticket. Skipped if has_ticket is False. Non-critical: errors appended. @@ -384,7 +384,7 @@ async def add_jira_pr_link(state: dict[str, Any], config: dict) -> dict: # Node: calculate_version # --------------------------------------------------------------------------- -async def calculate_version(state: dict[str, Any], config: dict) -> dict: +async def calculate_version(state: dict[str, Any], config: RunnableConfig) -> dict: """Calculate the next version for the repository using the staging store. Uses calculate_next_version from versioning module. @@ -402,7 +402,7 @@ async def calculate_version(state: dict[str, Any], config: dict) -> dict: # Node: update_staging # --------------------------------------------------------------------------- -async def update_staging(state: dict[str, Any], config: dict) -> dict: +async def update_staging(state: dict[str, Any], config: RunnableConfig) -> dict: """Add the PR's ticket to the staging release. If no staging exists, creates a new one. If has_ticket is False, skips @@ -461,7 +461,7 @@ async def update_staging(state: dict[str, Any], config: dict) -> dict: # Node: notify_request_changes # --------------------------------------------------------------------------- -async def notify_request_changes(state: dict[str, Any], config: dict) -> dict: +async def notify_request_changes(state: dict[str, Any], config: RunnableConfig) -> dict: """Send a Slack notification when the review requests changes. Non-critical: errors appended. @@ -485,7 +485,7 @@ async def notify_request_changes(state: dict[str, Any], config: dict) -> dict: approval_url="", ) return {"messages": [f"Slack notified: request changes for PR #{pr_id}"]} - except ReleaseAgentError as exc: + except Exception as exc: return {"errors": [f"notify_request_changes failed: {exc}"]} diff --git a/src/release_agent/graph/release.py b/src/release_agent/graph/release.py index 65ff858..345add6 100644 --- a/src/release_agent/graph/release.py +++ b/src/release_agent/graph/release.py @@ -29,7 +29,7 @@ from datetime import date from typing import Any from langgraph.graph import END, START, StateGraph -from langgraph.types import interrupt +from langgraph.types import RunnableConfig, interrupt from release_agent.exceptions import ReleaseAgentError from release_agent.graph.ci_nodes import poll_ci_build, trigger_ci_build @@ -45,11 +45,11 @@ from release_agent.tools.slack import _build_ci_status_blocks # Helpers # --------------------------------------------------------------------------- -def _get_clients(config: dict) -> ToolClients: +def _get_clients(config: RunnableConfig) -> ToolClients: return config["configurable"]["clients"] -def _get_staging_store(config: dict): +def _get_staging_store(config: RunnableConfig): return config["configurable"].get("staging_store") @@ -57,7 +57,7 @@ def _get_staging_store(config: dict): # Node: load_staging # --------------------------------------------------------------------------- -async def load_staging(state: dict[str, Any], config: dict) -> dict: +async def load_staging(state: dict[str, Any], config: RunnableConfig) -> dict: """Load the current staging release from the store.""" repo_name = state.get("repo_name", "") staging_store = _get_staging_store(config) @@ -73,7 +73,7 @@ async def load_staging(state: dict[str, Any], config: dict) -> dict: # Node: interrupt_confirm_release # --------------------------------------------------------------------------- -async def interrupt_confirm_release(state: dict[str, Any], config: dict) -> dict: +async def interrupt_confirm_release(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to confirm starting the release.""" repo_name = state.get("repo_name", "") staging_dict = state.get("staging") or {} @@ -93,7 +93,7 @@ async def interrupt_confirm_release(state: dict[str, Any], config: dict) -> dict # Node: create_release_pr # --------------------------------------------------------------------------- -async def create_release_pr(state: dict[str, Any], config: dict) -> dict: +async def create_release_pr(state: dict[str, Any], config: RunnableConfig) -> dict: """Create a release PR in AzDo from a release branch to main.""" clients = _get_clients(config) repo_name = state.get("repo_name", "") @@ -129,7 +129,7 @@ async def create_release_pr(state: dict[str, Any], config: dict) -> dict: # Node: interrupt_confirm_merge_release # --------------------------------------------------------------------------- -async def interrupt_confirm_merge_release(state: dict[str, Any], config: dict) -> dict: +async def interrupt_confirm_merge_release(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to confirm merging the release PR.""" release_pr_id = state.get("release_pr_id", "?") version = state.get("version", "") @@ -146,7 +146,7 @@ async def interrupt_confirm_merge_release(state: dict[str, Any], config: dict) - # Node: merge_release_pr # --------------------------------------------------------------------------- -async def merge_release_pr(state: dict[str, Any], config: dict) -> dict: +async def merge_release_pr(state: dict[str, Any], config: RunnableConfig) -> dict: """Merge the release PR via AzDo API.""" clients = _get_clients(config) pr_id = int(state.get("release_pr_id", 0)) @@ -159,7 +159,7 @@ async def merge_release_pr(state: dict[str, Any], config: dict) -> dict: # Node: trigger_ci_build_main (delegates to ci_nodes.trigger_ci_build) # --------------------------------------------------------------------------- -async def trigger_ci_build_main(state: dict[str, Any], config: dict) -> dict: +async def trigger_ci_build_main(state: dict[str, Any], config: RunnableConfig) -> dict: """Trigger CI build on main after the release PR is merged.""" return await trigger_ci_build(state, config) @@ -168,7 +168,7 @@ async def trigger_ci_build_main(state: dict[str, Any], config: dict) -> dict: # Node: poll_ci_build_main (delegates to ci_nodes.poll_ci_build) # --------------------------------------------------------------------------- -async def poll_ci_build_main(state: dict[str, Any], config: dict) -> dict: +async def poll_ci_build_main(state: dict[str, Any], config: RunnableConfig) -> dict: """Poll the main branch CI build until completion.""" return await poll_ci_build(state, config) @@ -177,7 +177,7 @@ async def poll_ci_build_main(state: dict[str, Any], config: dict) -> dict: # Node: notify_ci_failure # --------------------------------------------------------------------------- -async def notify_ci_failure(state: dict[str, Any], config: dict) -> dict: +async def notify_ci_failure(state: dict[str, Any], config: RunnableConfig) -> dict: """Send a Slack notification that the CI build failed on main. Non-critical: errors appended. @@ -207,7 +207,7 @@ async def notify_ci_failure(state: dict[str, Any], config: dict) -> dict: # Node: wait_for_cd_release # --------------------------------------------------------------------------- -async def wait_for_cd_release(state: dict[str, Any], config: dict) -> dict: +async def wait_for_cd_release(state: dict[str, Any], config: RunnableConfig) -> dict: """Wait for the CD pipeline to create a release after CI passes. Fetches the latest release for the configured release definition. @@ -235,7 +235,7 @@ async def wait_for_cd_release(state: dict[str, Any], config: dict) -> dict: # Node: poll_release_approvals # --------------------------------------------------------------------------- -async def poll_release_approvals(state: dict[str, Any], config: dict) -> dict: +async def poll_release_approvals(state: dict[str, Any], config: RunnableConfig) -> dict: """Poll AzDo for pending release environment approvals. Non-critical: errors appended on failure. @@ -267,7 +267,7 @@ async def poll_release_approvals(state: dict[str, Any], config: dict) -> dict: # Node: interrupt_sandbox_approval # --------------------------------------------------------------------------- -async def interrupt_sandbox_approval(state: dict[str, Any], config: dict) -> dict: +async def interrupt_sandbox_approval(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to approve the sandbox deployment.""" approvals = state.get("pending_approvals") or [] version = state.get("version", "") @@ -285,7 +285,7 @@ async def interrupt_sandbox_approval(state: dict[str, Any], config: dict) -> dic # Node: execute_sandbox_approval # --------------------------------------------------------------------------- -async def execute_sandbox_approval(state: dict[str, Any], config: dict) -> dict: +async def execute_sandbox_approval(state: dict[str, Any], config: RunnableConfig) -> dict: """Approve all pending sandbox stage approvals via AzDo VSRM. Non-critical per approval: errors appended on individual failures. @@ -313,7 +313,7 @@ async def execute_sandbox_approval(state: dict[str, Any], config: dict) -> dict: # Node: interrupt_prod_approval # --------------------------------------------------------------------------- -async def interrupt_prod_approval(state: dict[str, Any], config: dict) -> dict: +async def interrupt_prod_approval(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to approve the production deployment.""" approvals = state.get("pending_approvals") or [] version = state.get("version", "") @@ -331,7 +331,7 @@ async def interrupt_prod_approval(state: dict[str, Any], config: dict) -> dict: # Node: execute_prod_approval # --------------------------------------------------------------------------- -async def execute_prod_approval(state: dict[str, Any], config: dict) -> dict: +async def execute_prod_approval(state: dict[str, Any], config: RunnableConfig) -> dict: """Approve all pending production stage approvals via AzDo VSRM. Non-critical per approval: errors appended on individual failures. @@ -359,7 +359,7 @@ async def execute_prod_approval(state: dict[str, Any], config: dict) -> dict: # Node: move_tickets_to_done # --------------------------------------------------------------------------- -async def move_tickets_to_done(state: dict[str, Any], config: dict) -> dict: +async def move_tickets_to_done(state: dict[str, Any], config: RunnableConfig) -> dict: """Transition all staging tickets to Done/Released in Jira.""" clients = _get_clients(config) staging_dict = state.get("staging") or {} @@ -382,7 +382,7 @@ async def move_tickets_to_done(state: dict[str, Any], config: dict) -> dict: # Node: send_slack_notification (send_release_notification) # --------------------------------------------------------------------------- -async def send_slack_notification(state: dict[str, Any], config: dict) -> dict: +async def send_slack_notification(state: dict[str, Any], config: RunnableConfig) -> dict: """Send a release notification to Slack.""" clients = _get_clients(config) repo_name = state.get("repo_name", "") @@ -407,7 +407,7 @@ async def send_slack_notification(state: dict[str, Any], config: dict) -> dict: # Node: archive_release # --------------------------------------------------------------------------- -async def archive_release(state: dict[str, Any], config: dict) -> dict: +async def archive_release(state: dict[str, Any], config: RunnableConfig) -> dict: """Archive the staging release in the store.""" staging_store = _get_staging_store(config) staging_dict = state.get("staging") or {} @@ -422,7 +422,7 @@ async def archive_release(state: dict[str, Any], config: dict) -> dict: # Legacy nodes kept for backward compatibility # --------------------------------------------------------------------------- -async def list_pipelines(state: dict[str, Any], config: dict) -> dict: +async def list_pipelines(state: dict[str, Any], config: RunnableConfig) -> dict: """List build pipelines for the repository via AzDo.""" clients = _get_clients(config) repo_name = state.get("repo_name", "") @@ -433,7 +433,7 @@ async def list_pipelines(state: dict[str, Any], config: dict) -> dict: return {"errors": [f"list_pipelines failed: {exc}"], "pipelines": []} -async def interrupt_confirm_trigger(state: dict[str, Any], config: dict) -> dict: +async def interrupt_confirm_trigger(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to confirm triggering pipelines.""" repo_name = state.get("repo_name", "") version = state.get("version", "") @@ -448,7 +448,7 @@ async def interrupt_confirm_trigger(state: dict[str, Any], config: dict) -> dict return {} -async def trigger_pipelines(state: dict[str, Any], config: dict) -> dict: +async def trigger_pipelines(state: dict[str, Any], config: RunnableConfig) -> dict: """Trigger all listed pipelines for the release branch.""" clients = _get_clients(config) repo_name = state.get("repo_name", "") @@ -475,7 +475,7 @@ async def trigger_pipelines(state: dict[str, Any], config: dict) -> dict: return result_dict -async def check_release_approvals(state: dict[str, Any], config: dict) -> dict: +async def check_release_approvals(state: dict[str, Any], config: RunnableConfig) -> dict: """Check triggered builds for pending stage approvals.""" clients = _get_clients(config) triggered_builds = state.get("triggered_builds") or [] @@ -495,7 +495,7 @@ async def check_release_approvals(state: dict[str, Any], config: dict) -> dict: return result_dict -async def interrupt_confirm_approve(state: dict[str, Any], config: dict) -> dict: +async def interrupt_confirm_approve(state: dict[str, Any], config: RunnableConfig) -> dict: """Interrupt to ask the operator to confirm approving pipeline stages.""" approvals = state.get("pending_approvals") or [] version = state.get("version", "") @@ -509,7 +509,7 @@ async def interrupt_confirm_approve(state: dict[str, Any], config: dict) -> dict return {} -async def approve_stage(state: dict[str, Any], config: dict) -> dict: +async def approve_stage(state: dict[str, Any], config: RunnableConfig) -> dict: """Approve all pending pipeline stage approvals via AzDo VSRM.""" clients = _get_clients(config) approvals = state.get("pending_approvals") or [] diff --git a/src/release_agent/main.py b/src/release_agent/main.py index ace4e5c..cad9ccc 100644 --- a/src/release_agent/main.py +++ b/src/release_agent/main.py @@ -73,7 +73,7 @@ def _create_tool_clients(settings: Settings) -> tuple[ToolClients, list]: bot_token=settings.slack_bot_token.get_secret_value(), channel_id=settings.slack_channel_id, ) - reviewer = ClaudeReviewer() + reviewer = ClaudeReviewer(claude_cmd=settings.claude_cmd) clients = ToolClients(azdo=azdo, jira=jira, slack=slack, reviewer=reviewer) return clients, [http_client, vsrm_http_client] diff --git a/src/release_agent/services/pr_poller.py b/src/release_agent/services/pr_poller.py index 4961ae2..9295c0c 100644 --- a/src/release_agent/services/pr_poller.py +++ b/src/release_agent/services/pr_poller.py @@ -40,7 +40,7 @@ def _synthesize_webhook_payload(pr: PRInfo) -> dict: "repository": { "id": f"{pr.repo_name}-id", "name": pr.repo_name, - "web_url": str(pr.pr_url).rsplit("/pullrequest/", 1)[0], + "web_url": f"https://dev.azure.com/billodev/Billo%20App%20Platform/_git/{pr.repo_name}", }, }, } diff --git a/src/release_agent/tools/azdo.py b/src/release_agent/tools/azdo.py index d9c8ee3..f540201 100644 --- a/src/release_agent/tools/azdo.py +++ b/src/release_agent/tools/azdo.py @@ -40,8 +40,9 @@ class AzDoClient: http_client: httpx.AsyncClient, vsrm_http_client: httpx.AsyncClient, ) -> None: - self._base_url = base_url.rstrip("/") - self._vsrm_base_url = vsrm_base_url.rstrip("/") + # URL-encode spaces in project names (e.g., "Billo App Platform" → "Billo%20App%20Platform") + self._base_url = base_url.rstrip("/").replace(" ", "%20") + self._vsrm_base_url = vsrm_base_url.rstrip("/").replace(" ", "%20") self._auth = build_auth_header("", pat) self._http = http_client self._vsrm = vsrm_http_client @@ -133,14 +134,25 @@ class AzDoClient: """ pr = await self.get_pr(pr_id) repo = pr.repo_name - url = f"{self._base_url}/git/repositories/{repo}/pullRequests/{pr_id}/diffs" - response = await self._http.get( - url, - headers=self._auth, - params={"api-version": _API_VERSION, "baseVersionDescriptor.versionType": "commit"}, + + # Get iterations to find changed files + iter_url = f"{self._base_url}/git/repositories/{repo}/pullRequests/{pr_id}/iterations" + iter_resp = await self._http.get( + iter_url, headers=self._auth, params={"api-version": _API_VERSION}, ) - raise_for_status(response, service="azdo") - data = response.json() + raise_for_status(iter_resp, service="azdo") + iterations = iter_resp.json().get("value", []) + if not iterations: + return "No iterations found" + + # Get changes from the latest iteration + last_iter = iterations[-1]["id"] + changes_url = f"{self._base_url}/git/repositories/{repo}/pullRequests/{pr_id}/iterations/{last_iter}/changes" + changes_resp = await self._http.get( + changes_url, headers=self._auth, params={"api-version": _API_VERSION}, + ) + raise_for_status(changes_resp, service="azdo") + data = changes_resp.json() return _format_diff(data) async def merge_pr(self, *, pr_id: int, last_merge_source_commit: str) -> bool: @@ -481,10 +493,14 @@ def _parse_pr(data: dict) -> PRInfo: pr_id = str(data["pullRequestId"]) repo = data["repository"]["name"] branch = data.get("sourceRefName", "") - url = data.get("url", "") + # Prefer _links.web.href for a proper web URL; fall back to constructing one + url = (data.get("_links", {}).get("web", {}).get("href", "") + or data["repository"].get("webUrl", "") + or data["repository"].get("remoteUrl", "")) + if url and "/pullrequest" not in url.lower(): + url = f"{url}/pullrequest/{pr_id}" if not url: - repo_url = data["repository"].get("remoteUrl", "") - url = f"{repo_url}/pullrequest/{pr_id}" + url = data.get("url", f"https://dev.azure.com/_unknown/pullrequest/{pr_id}") return PRInfo( pr_id=pr_id, pr_url=url, @@ -502,8 +518,8 @@ def _map_pr_status(raw: str) -> str: def _format_diff(data: dict) -> str: - """Format the diffs API response as a text string.""" - changes = data.get("changes", []) + """Format the iteration changes API response as a text string.""" + changes = data.get("changeEntries", data.get("changes", [])) lines = [] for change in changes: item = change.get("item", {}) diff --git a/src/release_agent/tools/claude_review.py b/src/release_agent/tools/claude_review.py index 89d91a0..f51585a 100644 --- a/src/release_agent/tools/claude_review.py +++ b/src/release_agent/tools/claude_review.py @@ -236,7 +236,7 @@ def _build_prompt(*, diff: str, pr_title: str, repo_name: str) -> str: def _parse_cli_output(stdout: str) -> ReviewResult: """Parse the JSON output from Claude Code CLI into a ReviewResult.""" try: - data = json.loads(stdout) + data = json.loads(stdout.strip()) except json.JSONDecodeError as exc: raise ValueError(f"Failed to parse Claude CLI output as JSON: {exc}") from exc @@ -281,7 +281,7 @@ def _parse_cli_output(stdout: str) -> ReviewResult: def _parse_ticket_output(stdout: str) -> tuple[str, str]: """Parse the JSON output from Claude Code CLI into (summary, description).""" try: - data = json.loads(stdout) + data = json.loads(stdout.strip()) except json.JSONDecodeError as exc: raise ValueError(f"Failed to parse Claude CLI ticket output as JSON: {exc}") from exc @@ -312,24 +312,29 @@ async def _default_run_subprocess( cwd: str | None, timeout: int, ) -> tuple[str, str, int]: - """Run a subprocess and return (stdout, stderr, returncode).""" - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=cwd, - ) - try: - stdout_bytes, stderr_bytes = await asyncio.wait_for( - process.communicate(), timeout=timeout - ) - except asyncio.TimeoutError: - process.kill() - await process.wait() - raise RuntimeError(f"Claude CLI timed out after {timeout} seconds") + """Run a subprocess and return (stdout, stderr, returncode). - return ( - stdout_bytes.decode("utf-8", errors="replace"), - stderr_bytes.decode("utf-8", errors="replace"), - process.returncode or 0, - ) + Uses subprocess.run in a thread pool to avoid Windows SelectorEventLoop + incompatibility with asyncio.create_subprocess_exec. + """ + import subprocess + + def _run() -> tuple[str, str, int]: + try: + print(f"[DEBUG] Claude CLI: running {cmd[0]} (cwd={cwd}, timeout={timeout})", flush=True) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + cwd=cwd, + timeout=timeout, + ) + print(f"[DEBUG] Claude CLI: rc={result.returncode}, stdout_len={len(result.stdout)}, stderr_len={len(result.stderr)}", flush=True) + if not result.stdout: + print(f"[DEBUG] Claude CLI EMPTY stdout. stderr: {result.stderr[:500]}", flush=True) + return (result.stdout, result.stderr, result.returncode) + except subprocess.TimeoutExpired: + raise RuntimeError(f"Claude CLI timed out after {timeout} seconds") + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, _run) diff --git a/src/release_agent/tools/slack.py b/src/release_agent/tools/slack.py index 0754f95..49827ec 100644 --- a/src/release_agent/tools/slack.py +++ b/src/release_agent/tools/slack.py @@ -229,6 +229,8 @@ class SlackClient: async def _post_blocks(self, blocks: list[dict]) -> bool: """POST a Block Kit payload to the webhook URL.""" + if not self._webhook_url: + return False response = await self._http.post( self._webhook_url, json={"blocks": blocks},