fix: runtime fixes for WSL deployment and integration testing

- Fix RunnableConfig type annotations (dict -> RunnableConfig) for LangGraph compat
- Fix AzDo PR URL parsing (_links.web.href fallback + remoteUrl construction)
- Fix AzDo diff endpoint (use iterations/changes instead of non-existent diffs API)
- Fix _format_diff to read changeEntries field (not changes)
- Fix URL encoding for project names with spaces (Billo App Platform)
- Fix subprocess.run for Windows (replace asyncio.create_subprocess_exec with thread pool)
- Fix SlackClient to handle empty webhook URL gracefully
- Fix notify_request_changes to catch all exceptions (not just ReleaseAgentError)
- Fix JSON parsing to strip whitespace before json.loads
- Add CLAUDE_CMD config field for cross-platform CLI path
- Add run.py for Windows SelectorEventLoop workaround
- Add db port mapping in docker-compose for local dev
- Add comprehensive README sections: WSL setup, known issues, TODO list
This commit is contained in:
Yaojia Wang
2026-03-24 23:05:04 +01:00
parent f5c2733cfb
commit b67cbcfd93
13 changed files with 272 additions and 88 deletions

View File

@@ -330,6 +330,62 @@ docker compose up -d
The agent service includes a health check at `/status`. PostgreSQL uses The agent service includes a health check at `/status`. PostgreSQL uses
`pg_isready` with `service_healthy` dependency. `pg_isready` with `service_healthy` dependency.
## Running Locally (WSL Recommended)
The app runs best on **WSL (Ubuntu)** because:
- `psycopg` async requires `SelectorEventLoop` (incompatible with Windows `ProactorEventLoop`)
- `subprocess.run` captures Claude CLI stdout correctly on Linux but not reliably on Windows
- PostgreSQL runs in Docker (accessible from both Windows and WSL via `localhost`)
### Setup
```bash
# 1. Start PostgreSQL (from Windows or WSL)
cd /mnt/c/Users/yaoji/git/Billo/billo-release-agent
docker compose up -d db
# 2. Install uv in WSL (if needed)
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
# 3. Install dependencies
uv sync --all-extras
# 4. Configure .env
# Key settings:
# CLAUDE_CMD=claude (not claude.cmd — WSL finds it via PATH)
# REPOS_BASE_DIR=/mnt/c/Users/yaoji/git/Billo
# PR_POLL_ENABLED=False (disable during dev to avoid noise)
# SLACK_WEBHOOK_URL= (leave empty during dev)
# 5. Start the server
uv run uvicorn release_agent.main:app --host 0.0.0.0 --port 8080
# 6. Test
curl http://localhost:8080/status
curl -X POST http://localhost:8080/manual/pr/10443
```
### Windows-only (Fallback)
If you must run on Windows directly, use the provided `run.py` script which sets
`WindowsSelectorEventLoopPolicy` before starting uvicorn:
```bash
uv run python run.py
```
Note: Claude CLI subprocess may return empty stdout on Windows due to event loop
incompatibility. WSL is the recommended approach.
### Performance Note: WSL + /mnt/c
Claude Code CLI with `--allowedTools Read,Glob,Grep` on `/mnt/c` (Windows filesystem
mounted in WSL) is very slow. For faster code reviews, either:
1. **Clone repos to WSL native filesystem** (`~/git/Billo/`) and set `REPOS_BASE_DIR=~/git/Billo`
2. **Remove `--allowedTools`** so Claude only reviews the diff text (faster but less thorough)
## Slack App Setup ## Slack App Setup
To use interactive buttons (optional — REST API approvals still work without it): To use interactive buttons (optional — REST API approvals still work without it):
@@ -339,3 +395,46 @@ To use interactive buttons (optional — REST API approvals still work without i
3. Add Bot Token Scopes: `chat:write`, `chat:update` 3. Add Bot Token Scopes: `chat:write`, `chat:update`
4. Install to workspace, get Bot Token (`xoxb-...`) 4. Install to workspace, get Bot Token (`xoxb-...`)
5. Set `SLACK_BOT_TOKEN`, `SLACK_SIGNING_SECRET`, `SLACK_CHANNEL_ID` in `.env` 5. Set `SLACK_BOT_TOKEN`, `SLACK_SIGNING_SECRET`, `SLACK_CHANNEL_ID` in `.env`
## Current Status
### Working
- App startup, health check, API endpoints
- Azure DevOps API integration (get PR, list active PRs, get iterations/changes)
- PR info parsing (repo_name, ticket_id, branch extraction)
- Graph execution (full pr_completed flow: parse -> fetch -> route -> review -> notify)
- Database read/write (agent_threads table)
- Slack error handling (empty webhook URL gracefully skipped)
- Claude CLI ticket generation (tested: returns structured JSON)
- Claude CLI code review (tested: returns structured JSON with verdict + issues)
- PR review comments posted to Azure DevOps (inline + summary)
- Node type annotations fixed (`RunnableConfig` instead of `dict`)
### Known Issues
| Issue | Severity | Workaround |
|-------|----------|------------|
| Windows: Claude CLI subprocess returns empty stdout | HIGH | Run in WSL |
| WSL + /mnt/c: Claude CLI Read/Glob very slow (10+ min) | MEDIUM | Clone repos to WSL native fs |
| Graph has no LangGraph checkpointer (interrupt not persistent) | MEDIUM | Graphs run to completion or fail; no resume |
| `_upsert_thread` only writes final state (no intermediate updates) | LOW | Query DB only after graph completes |
| CI poll may run indefinitely (no build to poll in dev) | LOW | Leave `PR_POLL_ENABLED=False` |
| Config test failures (env var leakage from .env) | LOW | Run with `-k "not test_config"` |
### TODO (Not Yet Implemented)
- [ ] Wire LangGraph checkpointer (PostgreSQL) for interrupt persistence
- [ ] Interrupt decision validation (currently any resume value proceeds)
- [ ] Slack interactive buttons end-to-end (Slack App not yet created)
- [ ] CI/CD pipeline trigger end-to-end testing
- [ ] Release approval gate detection (check_release_approvals is a stub)
- [ ] `last_merge_source_commit` from AzDo API for safe merge
- [ ] Operator token auth testing in production
- [ ] Multi-stage Dockerfile for smaller images
- [ ] Centralize `_upsert_thread` into shared `api/db.py` module
- [ ] Remove dead `has_ticket` routing function
- [ ] PR poller dedup query correctness (unnest pair-wise matching untested against real DB)
- [ ] `archive_release` date injection (replace `date.today()` with config)
- [ ] Approval loop max iteration guard (prevent infinite loops)
- [ ] Migrate existing release JSON data to PostgreSQL

View File

@@ -35,6 +35,8 @@ services:
POSTGRES_USER: agent POSTGRES_USER: agent
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set}
POSTGRES_DB: agent POSTGRES_DB: agent
ports:
- "5432:5432"
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - postgres_data:/var/lib/postgresql/data
healthcheck: healthcheck:

22
run.py Normal file
View File

@@ -0,0 +1,22 @@
"""Windows-compatible startup script.
psycopg async requires SelectorEventLoop, not the default ProactorEventLoop on Windows.
This script sets the correct event loop policy before starting uvicorn.
"""
import asyncio
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import uvicorn
if __name__ == "__main__":
uvicorn.run(
"release_agent.main:app",
host="0.0.0.0",
port=8080,
reload=False,
loop="none", # don't let uvicorn override the event loop
)

View File

@@ -92,11 +92,43 @@ async def manual_pr_trigger(
db_pool=Depends(get_db_pool), db_pool=Depends(get_db_pool),
_auth: None = Depends(require_operator_token), _auth: None = Depends(require_operator_token),
) -> ManualTriggerResponse: ) -> ManualTriggerResponse:
"""Manually trigger PR processing for the given PR ID.""" """Manually trigger PR processing for the given PR ID.
Fetches PR details from AzDo first to build a proper initial state
including the synthesized webhook payload.
"""
from release_agent.api.webhooks import _run_graph from release_agent.api.webhooks import _run_graph
settings = request.app.state.settings
thread_id = str(uuid.uuid4()) thread_id = str(uuid.uuid4())
initial_state = {"pr_id": pr_id}
# Fetch PR info to build webhook-compatible initial state
try:
pr_info = await tool_clients.azdo.get_pr(int(pr_id))
initial_state = {
"webhook_payload": {
"event_type": "git.pullrequest.updated",
"subscription_id": f"manual-{pr_id}",
"resource": {
"pull_request_id": int(pr_id),
"title": pr_info.pr_title,
"status": pr_info.pr_status,
"source_ref_name": pr_info.branch,
"target_ref_name": "refs/heads/develop",
"closed_date": None,
"repository": {
"id": f"{pr_info.repo_name}-id",
"name": pr_info.repo_name,
"web_url": f"https://dev.azure.com/billodev/Billo%20App%20Platform/_git/{pr_info.repo_name}",
},
},
},
"pr_id": pr_id,
"repo_name": pr_info.repo_name,
}
except Exception:
# Fallback: minimal state, let parse_webhook handle errors
initial_state = {"pr_id": pr_id}
task = asyncio.create_task( task = asyncio.create_task(
_run_graph( _run_graph(
@@ -105,6 +137,9 @@ async def manual_pr_trigger(
thread_id=thread_id, thread_id=thread_id,
tool_clients=tool_clients, tool_clients=tool_clients,
db_pool=db_pool, db_pool=db_pool,
repos_base_dir=settings.repos_base_dir,
graph_name="pr_completed",
default_jira_project=settings.default_jira_project,
) )
) )
request.app.state.background_tasks.add(task) request.app.state.background_tasks.add(task)

View File

@@ -63,6 +63,7 @@ class Settings(BaseSettings):
# Claude settings # Claude settings
claude_review_model: str = "claude-sonnet-4-20250514" claude_review_model: str = "claude-sonnet-4-20250514"
claude_cmd: str = "claude" # Path to Claude Code CLI (e.g., claude.cmd on Windows)
# Local repo settings # Local repo settings
repos_base_dir: str = "" # Base directory containing Billo repos (e.g., /c/Users/yaoji/git/Billo) repos_base_dir: str = "" # Base directory containing Billo repos (e.g., /c/Users/yaoji/git/Billo)

View File

@@ -8,6 +8,8 @@ External clients are accessed via config["configurable"]["clients"].
import logging import logging
from typing import Any from typing import Any
from langgraph.types import RunnableConfig
from release_agent.exceptions import ReleaseAgentError from release_agent.exceptions import ReleaseAgentError
from release_agent.graph.dependencies import ToolClients from release_agent.graph.dependencies import ToolClients
from release_agent.graph.polling import poll_until from release_agent.graph.polling import poll_until
@@ -21,11 +23,11 @@ _CI_POLL_INTERVAL = 30
_CI_POLL_MAX_WAIT = 1800 _CI_POLL_MAX_WAIT = 1800
def _get_clients(config: dict) -> ToolClients: def _get_clients(config: RunnableConfig) -> ToolClients:
return config["configurable"]["clients"] return config["configurable"]["clients"]
def _get_settings(config: dict): def _get_settings(config: RunnableConfig):
return config["configurable"].get("settings") return config["configurable"].get("settings")
@@ -33,7 +35,7 @@ def _get_settings(config: dict):
# Node: trigger_ci_build # Node: trigger_ci_build
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def trigger_ci_build(state: dict[str, Any], config: dict) -> dict: async def trigger_ci_build(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Trigger the CI build pipeline for the repository. """Trigger the CI build pipeline for the repository.
Finds the first available pipeline for the repo and triggers it on Finds the first available pipeline for the repo and triggers it on
@@ -85,7 +87,7 @@ async def trigger_ci_build(state: dict[str, Any], config: dict) -> dict:
# Node: poll_ci_build # Node: poll_ci_build
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def poll_ci_build(state: dict[str, Any], config: dict) -> dict: async def poll_ci_build(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Poll the CI build until completion or timeout. """Poll the CI build until completion or timeout.
Uses the polling utility with configurable interval and max wait. Uses the polling utility with configurable interval and max wait.
@@ -144,7 +146,7 @@ async def poll_ci_build(state: dict[str, Any], config: dict) -> dict:
# Node: notify_ci_result # Node: notify_ci_result
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def notify_ci_result(state: dict[str, Any], config: dict) -> dict: async def notify_ci_result(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Send a Slack notification with the CI build result. """Send a Slack notification with the CI build result.
Non-critical: errors are appended rather than re-raised. Non-critical: errors are appended rather than re-raised.

View File

@@ -9,7 +9,7 @@ from datetime import date
from typing import Any from typing import Any
from langgraph.graph import END, START, StateGraph from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt from langgraph.types import RunnableConfig, interrupt
from release_agent.branch_parser import parse_branch from release_agent.branch_parser import parse_branch
from release_agent.exceptions import ReleaseAgentError from release_agent.exceptions import ReleaseAgentError
@@ -28,11 +28,11 @@ from release_agent.versioning import calculate_next_version
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _get_clients(config: dict) -> ToolClients: def _get_clients(config: RunnableConfig) -> ToolClients:
return config["configurable"]["clients"] return config["configurable"]["clients"]
def _get_staging_store(config: dict): def _get_staging_store(config: RunnableConfig):
return config["configurable"].get("staging_store") return config["configurable"].get("staging_store")
@@ -40,7 +40,7 @@ def _get_staging_store(config: dict):
# Node: parse_webhook # Node: parse_webhook
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def parse_webhook(state: dict[str, Any], config: dict) -> dict: async def parse_webhook(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Parse the webhook_payload field and extract PR info and ticket ID. """Parse the webhook_payload field and extract PR info and ticket ID.
Returns a dict with pr_info, ticket_id, has_ticket, repo_name, pr_id. Returns a dict with pr_info, ticket_id, has_ticket, repo_name, pr_id.
@@ -80,7 +80,7 @@ async def parse_webhook(state: dict[str, Any], config: dict) -> dict:
# Node: fetch_pr_details # Node: fetch_pr_details
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def fetch_pr_details(state: dict[str, Any], config: dict) -> dict: async def fetch_pr_details(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Fetch full PR details from AzDo and check if already merged. """Fetch full PR details from AzDo and check if already merged.
Sets pr_already_merged, pr_diff, and last_merge_source_commit. Sets pr_already_merged, pr_diff, and last_merge_source_commit.
@@ -110,7 +110,7 @@ async def fetch_pr_details(state: dict[str, Any], config: dict) -> dict:
# Node: auto_create_ticket # Node: auto_create_ticket
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def auto_create_ticket(state: dict[str, Any], config: dict) -> dict: async def auto_create_ticket(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Auto-create a Jira ticket for a PR that has no existing ticket. """Auto-create a Jira ticket for a PR that has no existing ticket.
Uses ClaudeReviewer to generate ticket content from the PR diff, Uses ClaudeReviewer to generate ticket content from the PR diff,
@@ -163,7 +163,7 @@ async def auto_create_ticket(state: dict[str, Any], config: dict) -> dict:
# Node: move_jira_code_review # Node: move_jira_code_review
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def move_jira_code_review(state: dict[str, Any], config: dict) -> dict: async def move_jira_code_review(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Transition the Jira ticket to Code Review status. """Transition the Jira ticket to Code Review status.
Skipped if has_ticket is False. Non-critical: errors are appended. Skipped if has_ticket is False. Non-critical: errors are appended.
@@ -240,7 +240,7 @@ async def _post_review_to_pr(
# Node: run_code_review # Node: run_code_review
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def run_code_review(state: dict[str, Any], config: dict) -> dict: async def run_code_review(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Run Claude code review on the PR diff. """Run Claude code review on the PR diff.
Returns review_result as a serialisable dict. Returns review_result as a serialisable dict.
@@ -278,7 +278,7 @@ async def run_code_review(state: dict[str, Any], config: dict) -> dict:
# Node: evaluate_review # Node: evaluate_review
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def evaluate_review(state: dict[str, Any], config: dict) -> dict: async def evaluate_review(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Evaluate the review result and set review_approved. """Evaluate the review result and set review_approved.
Approved only when verdict=="approve" and no blockers present. Approved only when verdict=="approve" and no blockers present.
@@ -298,7 +298,7 @@ async def evaluate_review(state: dict[str, Any], config: dict) -> dict:
# Node: interrupt_confirm_merge # Node: interrupt_confirm_merge
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def interrupt_confirm_merge(state: dict[str, Any], config: dict) -> dict: async def interrupt_confirm_merge(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to confirm the PR merge. """Interrupt to ask the operator to confirm the PR merge.
Passes a human-readable summary string to interrupt(). Passes a human-readable summary string to interrupt().
@@ -319,7 +319,7 @@ async def interrupt_confirm_merge(state: dict[str, Any], config: dict) -> dict:
# Node: merge_pr_node # Node: merge_pr_node
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def merge_pr_node(state: dict[str, Any], config: dict) -> dict: async def merge_pr_node(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Merge the PR via AzDo API. """Merge the PR via AzDo API.
Critical node — re-raises ReleaseAgentError. Critical node — re-raises ReleaseAgentError.
@@ -336,7 +336,7 @@ async def merge_pr_node(state: dict[str, Any], config: dict) -> dict:
# Node: move_jira_ready_for_stage # Node: move_jira_ready_for_stage
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def move_jira_ready_for_stage(state: dict[str, Any], config: dict) -> dict: async def move_jira_ready_for_stage(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Transition the Jira ticket to Ready for stage (2). """Transition the Jira ticket to Ready for stage (2).
Skipped if has_ticket is False. Non-critical: errors appended. Skipped if has_ticket is False. Non-critical: errors appended.
@@ -356,7 +356,7 @@ async def move_jira_ready_for_stage(state: dict[str, Any], config: dict) -> dict
# Node: add_jira_pr_link # Node: add_jira_pr_link
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def add_jira_pr_link(state: dict[str, Any], config: dict) -> dict: async def add_jira_pr_link(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Add the PR URL as a remote link on the Jira ticket. """Add the PR URL as a remote link on the Jira ticket.
Skipped if has_ticket is False. Non-critical: errors appended. Skipped if has_ticket is False. Non-critical: errors appended.
@@ -384,7 +384,7 @@ async def add_jira_pr_link(state: dict[str, Any], config: dict) -> dict:
# Node: calculate_version # Node: calculate_version
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def calculate_version(state: dict[str, Any], config: dict) -> dict: async def calculate_version(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Calculate the next version for the repository using the staging store. """Calculate the next version for the repository using the staging store.
Uses calculate_next_version from versioning module. Uses calculate_next_version from versioning module.
@@ -402,7 +402,7 @@ async def calculate_version(state: dict[str, Any], config: dict) -> dict:
# Node: update_staging # Node: update_staging
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def update_staging(state: dict[str, Any], config: dict) -> dict: async def update_staging(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Add the PR's ticket to the staging release. """Add the PR's ticket to the staging release.
If no staging exists, creates a new one. If has_ticket is False, skips If no staging exists, creates a new one. If has_ticket is False, skips
@@ -461,7 +461,7 @@ async def update_staging(state: dict[str, Any], config: dict) -> dict:
# Node: notify_request_changes # Node: notify_request_changes
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def notify_request_changes(state: dict[str, Any], config: dict) -> dict: async def notify_request_changes(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Send a Slack notification when the review requests changes. """Send a Slack notification when the review requests changes.
Non-critical: errors appended. Non-critical: errors appended.
@@ -485,7 +485,7 @@ async def notify_request_changes(state: dict[str, Any], config: dict) -> dict:
approval_url="", approval_url="",
) )
return {"messages": [f"Slack notified: request changes for PR #{pr_id}"]} return {"messages": [f"Slack notified: request changes for PR #{pr_id}"]}
except ReleaseAgentError as exc: except Exception as exc:
return {"errors": [f"notify_request_changes failed: {exc}"]} return {"errors": [f"notify_request_changes failed: {exc}"]}

View File

@@ -29,7 +29,7 @@ from datetime import date
from typing import Any from typing import Any
from langgraph.graph import END, START, StateGraph from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt from langgraph.types import RunnableConfig, interrupt
from release_agent.exceptions import ReleaseAgentError from release_agent.exceptions import ReleaseAgentError
from release_agent.graph.ci_nodes import poll_ci_build, trigger_ci_build from release_agent.graph.ci_nodes import poll_ci_build, trigger_ci_build
@@ -45,11 +45,11 @@ from release_agent.tools.slack import _build_ci_status_blocks
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _get_clients(config: dict) -> ToolClients: def _get_clients(config: RunnableConfig) -> ToolClients:
return config["configurable"]["clients"] return config["configurable"]["clients"]
def _get_staging_store(config: dict): def _get_staging_store(config: RunnableConfig):
return config["configurable"].get("staging_store") return config["configurable"].get("staging_store")
@@ -57,7 +57,7 @@ def _get_staging_store(config: dict):
# Node: load_staging # Node: load_staging
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def load_staging(state: dict[str, Any], config: dict) -> dict: async def load_staging(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Load the current staging release from the store.""" """Load the current staging release from the store."""
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
staging_store = _get_staging_store(config) staging_store = _get_staging_store(config)
@@ -73,7 +73,7 @@ async def load_staging(state: dict[str, Any], config: dict) -> dict:
# Node: interrupt_confirm_release # Node: interrupt_confirm_release
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def interrupt_confirm_release(state: dict[str, Any], config: dict) -> dict: async def interrupt_confirm_release(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to confirm starting the release.""" """Interrupt to ask the operator to confirm starting the release."""
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
staging_dict = state.get("staging") or {} staging_dict = state.get("staging") or {}
@@ -93,7 +93,7 @@ async def interrupt_confirm_release(state: dict[str, Any], config: dict) -> dict
# Node: create_release_pr # Node: create_release_pr
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def create_release_pr(state: dict[str, Any], config: dict) -> dict: async def create_release_pr(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Create a release PR in AzDo from a release branch to main.""" """Create a release PR in AzDo from a release branch to main."""
clients = _get_clients(config) clients = _get_clients(config)
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
@@ -129,7 +129,7 @@ async def create_release_pr(state: dict[str, Any], config: dict) -> dict:
# Node: interrupt_confirm_merge_release # Node: interrupt_confirm_merge_release
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def interrupt_confirm_merge_release(state: dict[str, Any], config: dict) -> dict: async def interrupt_confirm_merge_release(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to confirm merging the release PR.""" """Interrupt to ask the operator to confirm merging the release PR."""
release_pr_id = state.get("release_pr_id", "?") release_pr_id = state.get("release_pr_id", "?")
version = state.get("version", "") version = state.get("version", "")
@@ -146,7 +146,7 @@ async def interrupt_confirm_merge_release(state: dict[str, Any], config: dict) -
# Node: merge_release_pr # Node: merge_release_pr
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def merge_release_pr(state: dict[str, Any], config: dict) -> dict: async def merge_release_pr(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Merge the release PR via AzDo API.""" """Merge the release PR via AzDo API."""
clients = _get_clients(config) clients = _get_clients(config)
pr_id = int(state.get("release_pr_id", 0)) pr_id = int(state.get("release_pr_id", 0))
@@ -159,7 +159,7 @@ async def merge_release_pr(state: dict[str, Any], config: dict) -> dict:
# Node: trigger_ci_build_main (delegates to ci_nodes.trigger_ci_build) # Node: trigger_ci_build_main (delegates to ci_nodes.trigger_ci_build)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def trigger_ci_build_main(state: dict[str, Any], config: dict) -> dict: async def trigger_ci_build_main(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Trigger CI build on main after the release PR is merged.""" """Trigger CI build on main after the release PR is merged."""
return await trigger_ci_build(state, config) return await trigger_ci_build(state, config)
@@ -168,7 +168,7 @@ async def trigger_ci_build_main(state: dict[str, Any], config: dict) -> dict:
# Node: poll_ci_build_main (delegates to ci_nodes.poll_ci_build) # Node: poll_ci_build_main (delegates to ci_nodes.poll_ci_build)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def poll_ci_build_main(state: dict[str, Any], config: dict) -> dict: async def poll_ci_build_main(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Poll the main branch CI build until completion.""" """Poll the main branch CI build until completion."""
return await poll_ci_build(state, config) return await poll_ci_build(state, config)
@@ -177,7 +177,7 @@ async def poll_ci_build_main(state: dict[str, Any], config: dict) -> dict:
# Node: notify_ci_failure # Node: notify_ci_failure
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def notify_ci_failure(state: dict[str, Any], config: dict) -> dict: async def notify_ci_failure(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Send a Slack notification that the CI build failed on main. """Send a Slack notification that the CI build failed on main.
Non-critical: errors appended. Non-critical: errors appended.
@@ -207,7 +207,7 @@ async def notify_ci_failure(state: dict[str, Any], config: dict) -> dict:
# Node: wait_for_cd_release # Node: wait_for_cd_release
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def wait_for_cd_release(state: dict[str, Any], config: dict) -> dict: async def wait_for_cd_release(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Wait for the CD pipeline to create a release after CI passes. """Wait for the CD pipeline to create a release after CI passes.
Fetches the latest release for the configured release definition. Fetches the latest release for the configured release definition.
@@ -235,7 +235,7 @@ async def wait_for_cd_release(state: dict[str, Any], config: dict) -> dict:
# Node: poll_release_approvals # Node: poll_release_approvals
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def poll_release_approvals(state: dict[str, Any], config: dict) -> dict: async def poll_release_approvals(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Poll AzDo for pending release environment approvals. """Poll AzDo for pending release environment approvals.
Non-critical: errors appended on failure. Non-critical: errors appended on failure.
@@ -267,7 +267,7 @@ async def poll_release_approvals(state: dict[str, Any], config: dict) -> dict:
# Node: interrupt_sandbox_approval # Node: interrupt_sandbox_approval
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def interrupt_sandbox_approval(state: dict[str, Any], config: dict) -> dict: async def interrupt_sandbox_approval(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to approve the sandbox deployment.""" """Interrupt to ask the operator to approve the sandbox deployment."""
approvals = state.get("pending_approvals") or [] approvals = state.get("pending_approvals") or []
version = state.get("version", "") version = state.get("version", "")
@@ -285,7 +285,7 @@ async def interrupt_sandbox_approval(state: dict[str, Any], config: dict) -> dic
# Node: execute_sandbox_approval # Node: execute_sandbox_approval
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def execute_sandbox_approval(state: dict[str, Any], config: dict) -> dict: async def execute_sandbox_approval(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Approve all pending sandbox stage approvals via AzDo VSRM. """Approve all pending sandbox stage approvals via AzDo VSRM.
Non-critical per approval: errors appended on individual failures. Non-critical per approval: errors appended on individual failures.
@@ -313,7 +313,7 @@ async def execute_sandbox_approval(state: dict[str, Any], config: dict) -> dict:
# Node: interrupt_prod_approval # Node: interrupt_prod_approval
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def interrupt_prod_approval(state: dict[str, Any], config: dict) -> dict: async def interrupt_prod_approval(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to approve the production deployment.""" """Interrupt to ask the operator to approve the production deployment."""
approvals = state.get("pending_approvals") or [] approvals = state.get("pending_approvals") or []
version = state.get("version", "") version = state.get("version", "")
@@ -331,7 +331,7 @@ async def interrupt_prod_approval(state: dict[str, Any], config: dict) -> dict:
# Node: execute_prod_approval # Node: execute_prod_approval
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def execute_prod_approval(state: dict[str, Any], config: dict) -> dict: async def execute_prod_approval(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Approve all pending production stage approvals via AzDo VSRM. """Approve all pending production stage approvals via AzDo VSRM.
Non-critical per approval: errors appended on individual failures. Non-critical per approval: errors appended on individual failures.
@@ -359,7 +359,7 @@ async def execute_prod_approval(state: dict[str, Any], config: dict) -> dict:
# Node: move_tickets_to_done # Node: move_tickets_to_done
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def move_tickets_to_done(state: dict[str, Any], config: dict) -> dict: async def move_tickets_to_done(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Transition all staging tickets to Done/Released in Jira.""" """Transition all staging tickets to Done/Released in Jira."""
clients = _get_clients(config) clients = _get_clients(config)
staging_dict = state.get("staging") or {} staging_dict = state.get("staging") or {}
@@ -382,7 +382,7 @@ async def move_tickets_to_done(state: dict[str, Any], config: dict) -> dict:
# Node: send_slack_notification (send_release_notification) # Node: send_slack_notification (send_release_notification)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def send_slack_notification(state: dict[str, Any], config: dict) -> dict: async def send_slack_notification(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Send a release notification to Slack.""" """Send a release notification to Slack."""
clients = _get_clients(config) clients = _get_clients(config)
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
@@ -407,7 +407,7 @@ async def send_slack_notification(state: dict[str, Any], config: dict) -> dict:
# Node: archive_release # Node: archive_release
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def archive_release(state: dict[str, Any], config: dict) -> dict: async def archive_release(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Archive the staging release in the store.""" """Archive the staging release in the store."""
staging_store = _get_staging_store(config) staging_store = _get_staging_store(config)
staging_dict = state.get("staging") or {} staging_dict = state.get("staging") or {}
@@ -422,7 +422,7 @@ async def archive_release(state: dict[str, Any], config: dict) -> dict:
# Legacy nodes kept for backward compatibility # Legacy nodes kept for backward compatibility
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def list_pipelines(state: dict[str, Any], config: dict) -> dict: async def list_pipelines(state: dict[str, Any], config: RunnableConfig) -> dict:
"""List build pipelines for the repository via AzDo.""" """List build pipelines for the repository via AzDo."""
clients = _get_clients(config) clients = _get_clients(config)
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
@@ -433,7 +433,7 @@ async def list_pipelines(state: dict[str, Any], config: dict) -> dict:
return {"errors": [f"list_pipelines failed: {exc}"], "pipelines": []} return {"errors": [f"list_pipelines failed: {exc}"], "pipelines": []}
async def interrupt_confirm_trigger(state: dict[str, Any], config: dict) -> dict: async def interrupt_confirm_trigger(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to confirm triggering pipelines.""" """Interrupt to ask the operator to confirm triggering pipelines."""
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
version = state.get("version", "") version = state.get("version", "")
@@ -448,7 +448,7 @@ async def interrupt_confirm_trigger(state: dict[str, Any], config: dict) -> dict
return {} return {}
async def trigger_pipelines(state: dict[str, Any], config: dict) -> dict: async def trigger_pipelines(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Trigger all listed pipelines for the release branch.""" """Trigger all listed pipelines for the release branch."""
clients = _get_clients(config) clients = _get_clients(config)
repo_name = state.get("repo_name", "") repo_name = state.get("repo_name", "")
@@ -475,7 +475,7 @@ async def trigger_pipelines(state: dict[str, Any], config: dict) -> dict:
return result_dict return result_dict
async def check_release_approvals(state: dict[str, Any], config: dict) -> dict: async def check_release_approvals(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Check triggered builds for pending stage approvals.""" """Check triggered builds for pending stage approvals."""
clients = _get_clients(config) clients = _get_clients(config)
triggered_builds = state.get("triggered_builds") or [] triggered_builds = state.get("triggered_builds") or []
@@ -495,7 +495,7 @@ async def check_release_approvals(state: dict[str, Any], config: dict) -> dict:
return result_dict return result_dict
async def interrupt_confirm_approve(state: dict[str, Any], config: dict) -> dict: async def interrupt_confirm_approve(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Interrupt to ask the operator to confirm approving pipeline stages.""" """Interrupt to ask the operator to confirm approving pipeline stages."""
approvals = state.get("pending_approvals") or [] approvals = state.get("pending_approvals") or []
version = state.get("version", "") version = state.get("version", "")
@@ -509,7 +509,7 @@ async def interrupt_confirm_approve(state: dict[str, Any], config: dict) -> dict
return {} return {}
async def approve_stage(state: dict[str, Any], config: dict) -> dict: async def approve_stage(state: dict[str, Any], config: RunnableConfig) -> dict:
"""Approve all pending pipeline stage approvals via AzDo VSRM.""" """Approve all pending pipeline stage approvals via AzDo VSRM."""
clients = _get_clients(config) clients = _get_clients(config)
approvals = state.get("pending_approvals") or [] approvals = state.get("pending_approvals") or []

View File

@@ -73,7 +73,7 @@ def _create_tool_clients(settings: Settings) -> tuple[ToolClients, list]:
bot_token=settings.slack_bot_token.get_secret_value(), bot_token=settings.slack_bot_token.get_secret_value(),
channel_id=settings.slack_channel_id, channel_id=settings.slack_channel_id,
) )
reviewer = ClaudeReviewer() reviewer = ClaudeReviewer(claude_cmd=settings.claude_cmd)
clients = ToolClients(azdo=azdo, jira=jira, slack=slack, reviewer=reviewer) clients = ToolClients(azdo=azdo, jira=jira, slack=slack, reviewer=reviewer)
return clients, [http_client, vsrm_http_client] return clients, [http_client, vsrm_http_client]

View File

@@ -40,7 +40,7 @@ def _synthesize_webhook_payload(pr: PRInfo) -> dict:
"repository": { "repository": {
"id": f"{pr.repo_name}-id", "id": f"{pr.repo_name}-id",
"name": pr.repo_name, "name": pr.repo_name,
"web_url": str(pr.pr_url).rsplit("/pullrequest/", 1)[0], "web_url": f"https://dev.azure.com/billodev/Billo%20App%20Platform/_git/{pr.repo_name}",
}, },
}, },
} }

View File

@@ -40,8 +40,9 @@ class AzDoClient:
http_client: httpx.AsyncClient, http_client: httpx.AsyncClient,
vsrm_http_client: httpx.AsyncClient, vsrm_http_client: httpx.AsyncClient,
) -> None: ) -> None:
self._base_url = base_url.rstrip("/") # URL-encode spaces in project names (e.g., "Billo App Platform" → "Billo%20App%20Platform")
self._vsrm_base_url = vsrm_base_url.rstrip("/") self._base_url = base_url.rstrip("/").replace(" ", "%20")
self._vsrm_base_url = vsrm_base_url.rstrip("/").replace(" ", "%20")
self._auth = build_auth_header("", pat) self._auth = build_auth_header("", pat)
self._http = http_client self._http = http_client
self._vsrm = vsrm_http_client self._vsrm = vsrm_http_client
@@ -133,14 +134,25 @@ class AzDoClient:
""" """
pr = await self.get_pr(pr_id) pr = await self.get_pr(pr_id)
repo = pr.repo_name repo = pr.repo_name
url = f"{self._base_url}/git/repositories/{repo}/pullRequests/{pr_id}/diffs"
response = await self._http.get( # Get iterations to find changed files
url, iter_url = f"{self._base_url}/git/repositories/{repo}/pullRequests/{pr_id}/iterations"
headers=self._auth, iter_resp = await self._http.get(
params={"api-version": _API_VERSION, "baseVersionDescriptor.versionType": "commit"}, iter_url, headers=self._auth, params={"api-version": _API_VERSION},
) )
raise_for_status(response, service="azdo") raise_for_status(iter_resp, service="azdo")
data = response.json() iterations = iter_resp.json().get("value", [])
if not iterations:
return "No iterations found"
# Get changes from the latest iteration
last_iter = iterations[-1]["id"]
changes_url = f"{self._base_url}/git/repositories/{repo}/pullRequests/{pr_id}/iterations/{last_iter}/changes"
changes_resp = await self._http.get(
changes_url, headers=self._auth, params={"api-version": _API_VERSION},
)
raise_for_status(changes_resp, service="azdo")
data = changes_resp.json()
return _format_diff(data) return _format_diff(data)
async def merge_pr(self, *, pr_id: int, last_merge_source_commit: str) -> bool: async def merge_pr(self, *, pr_id: int, last_merge_source_commit: str) -> bool:
@@ -481,10 +493,14 @@ def _parse_pr(data: dict) -> PRInfo:
pr_id = str(data["pullRequestId"]) pr_id = str(data["pullRequestId"])
repo = data["repository"]["name"] repo = data["repository"]["name"]
branch = data.get("sourceRefName", "") branch = data.get("sourceRefName", "")
url = data.get("url", "") # Prefer _links.web.href for a proper web URL; fall back to constructing one
url = (data.get("_links", {}).get("web", {}).get("href", "")
or data["repository"].get("webUrl", "")
or data["repository"].get("remoteUrl", ""))
if url and "/pullrequest" not in url.lower():
url = f"{url}/pullrequest/{pr_id}"
if not url: if not url:
repo_url = data["repository"].get("remoteUrl", "") url = data.get("url", f"https://dev.azure.com/_unknown/pullrequest/{pr_id}")
url = f"{repo_url}/pullrequest/{pr_id}"
return PRInfo( return PRInfo(
pr_id=pr_id, pr_id=pr_id,
pr_url=url, pr_url=url,
@@ -502,8 +518,8 @@ def _map_pr_status(raw: str) -> str:
def _format_diff(data: dict) -> str: def _format_diff(data: dict) -> str:
"""Format the diffs API response as a text string.""" """Format the iteration changes API response as a text string."""
changes = data.get("changes", []) changes = data.get("changeEntries", data.get("changes", []))
lines = [] lines = []
for change in changes: for change in changes:
item = change.get("item", {}) item = change.get("item", {})

View File

@@ -236,7 +236,7 @@ def _build_prompt(*, diff: str, pr_title: str, repo_name: str) -> str:
def _parse_cli_output(stdout: str) -> ReviewResult: def _parse_cli_output(stdout: str) -> ReviewResult:
"""Parse the JSON output from Claude Code CLI into a ReviewResult.""" """Parse the JSON output from Claude Code CLI into a ReviewResult."""
try: try:
data = json.loads(stdout) data = json.loads(stdout.strip())
except json.JSONDecodeError as exc: except json.JSONDecodeError as exc:
raise ValueError(f"Failed to parse Claude CLI output as JSON: {exc}") from exc raise ValueError(f"Failed to parse Claude CLI output as JSON: {exc}") from exc
@@ -281,7 +281,7 @@ def _parse_cli_output(stdout: str) -> ReviewResult:
def _parse_ticket_output(stdout: str) -> tuple[str, str]: def _parse_ticket_output(stdout: str) -> tuple[str, str]:
"""Parse the JSON output from Claude Code CLI into (summary, description).""" """Parse the JSON output from Claude Code CLI into (summary, description)."""
try: try:
data = json.loads(stdout) data = json.loads(stdout.strip())
except json.JSONDecodeError as exc: except json.JSONDecodeError as exc:
raise ValueError(f"Failed to parse Claude CLI ticket output as JSON: {exc}") from exc raise ValueError(f"Failed to parse Claude CLI ticket output as JSON: {exc}") from exc
@@ -312,24 +312,29 @@ async def _default_run_subprocess(
cwd: str | None, cwd: str | None,
timeout: int, timeout: int,
) -> tuple[str, str, int]: ) -> tuple[str, str, int]:
"""Run a subprocess and return (stdout, stderr, returncode).""" """Run a subprocess and return (stdout, stderr, returncode).
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise RuntimeError(f"Claude CLI timed out after {timeout} seconds")
return ( Uses subprocess.run in a thread pool to avoid Windows SelectorEventLoop
stdout_bytes.decode("utf-8", errors="replace"), incompatibility with asyncio.create_subprocess_exec.
stderr_bytes.decode("utf-8", errors="replace"), """
process.returncode or 0, import subprocess
)
def _run() -> tuple[str, str, int]:
try:
print(f"[DEBUG] Claude CLI: running {cmd[0]} (cwd={cwd}, timeout={timeout})", flush=True)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=cwd,
timeout=timeout,
)
print(f"[DEBUG] Claude CLI: rc={result.returncode}, stdout_len={len(result.stdout)}, stderr_len={len(result.stderr)}", flush=True)
if not result.stdout:
print(f"[DEBUG] Claude CLI EMPTY stdout. stderr: {result.stderr[:500]}", flush=True)
return (result.stdout, result.stderr, result.returncode)
except subprocess.TimeoutExpired:
raise RuntimeError(f"Claude CLI timed out after {timeout} seconds")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _run)

View File

@@ -229,6 +229,8 @@ class SlackClient:
async def _post_blocks(self, blocks: list[dict]) -> bool: async def _post_blocks(self, blocks: list[dict]) -> bool:
"""POST a Block Kit payload to the webhook URL.""" """POST a Block Kit payload to the webhook URL."""
if not self._webhook_url:
return False
response = await self._http.post( response = await self._http.post(
self._webhook_url, self._webhook_url,
json={"blocks": blocks}, json={"blocks": blocks},