Files
ColaFlow/docs/stories/sprint_5/story_5_12.md
Yaojia Wang 48a8431e4f feat(backend): Implement MCP Protocol Handler (Story 5.1)
Implemented JSON-RPC 2.0 protocol handler for MCP communication, enabling AI agents to communicate with ColaFlow using the Model Context Protocol.

**Implementation:**
- JSON-RPC 2.0 data models (Request, Response, Error, ErrorCode)
- MCP protocol models (Initialize, Capabilities, ClientInfo, ServerInfo)
- McpProtocolHandler with method routing and error handling
- Method handlers: initialize, resources/list, tools/list, tools/call
- ASP.NET Core middleware for /mcp endpoint
- Service registration and dependency injection setup

**Testing:**
- 28 unit tests covering protocol parsing, validation, and error handling
- Integration tests for initialize handshake and error responses
- All tests passing with >80% coverage

**Changes:**
- Created ColaFlow.Modules.Mcp.Contracts project
- Created ColaFlow.Modules.Mcp.Domain project
- Created ColaFlow.Modules.Mcp.Application project
- Created ColaFlow.Modules.Mcp.Infrastructure project
- Created ColaFlow.Modules.Mcp.Tests project
- Registered MCP module in ColaFlow.API Program.cs
- Added /mcp endpoint via middleware

**Acceptance Criteria Met:**
 JSON-RPC 2.0 messages correctly parsed
 Request validation (jsonrpc: "2.0", method, params, id)
 Error responses conform to JSON-RPC 2.0 spec
 Invalid requests return proper error codes (-32700, -32600, -32601, -32602)
 MCP initialize method implemented
 Server capabilities returned (resources, tools, prompts)
 Protocol version negotiation works (1.0)
 Request routing to method handlers
 Unit test coverage > 80%
 All tests passing

**Story**: docs/stories/sprint_5/story_5_1.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 19:38:34 +01:00

13 KiB

story_id, sprint_id, phase, status, priority, story_points, assignee, estimated_days, created_date, dependencies
story_id sprint_id phase status priority story_points assignee estimated_days created_date dependencies
story_5_12 sprint_5 Phase 3 - Tools & Diff Preview not_started P0 3 backend 1 2025-11-06
story_5_10

Story 5.12: SignalR Real-Time Notifications

Phase: Phase 3 - Tools & Diff Preview (Week 5-6) Priority: P0 CRITICAL Estimated Effort: 3 Story Points (1 day)

User Story

As an AI Agent I want to receive real-time notifications when my pending changes are approved or rejected So that I can continue my workflow without polling

Business Value

Real-time notifications complete the AI interaction loop:

  • Faster Feedback: AI knows approval result within 1 second
  • Better UX: No polling, immediate response
  • Scalability: WebSocket more efficient than polling
  • Future-Proof: Foundation for other real-time AI features

Acceptance Criteria

AC1: McpHub SignalR Hub

  • Create McpHub SignalR hub
  • Support SubscribeToPendingChange(pendingChangeId) method
  • Support UnsubscribeFromPendingChange(pendingChangeId) method
  • Authenticate connections via API Key

AC2: Approval Notification

  • When PendingChange approved → push notification to AI
  • Notification includes: pendingChangeId, status, execution result
  • Delivered within 1 second of approval
  • Use SignalR groups for targeting

AC3: Rejection Notification

  • When PendingChange rejected → push notification to AI
  • Notification includes: pendingChangeId, status, rejection reason
  • Delivered within 1 second of rejection

AC4: Expiration Notification

  • When PendingChange expired → push notification to AI
  • Notification includes: pendingChangeId, status

AC5: Connection Management

  • API Key authentication for SignalR connections
  • Handle disconnections gracefully
  • Reconnection support
  • Fallback to polling if WebSocket unavailable

AC6: Testing

  • Unit tests for event handlers
  • Integration tests for SignalR notifications
  • Test connection authentication
  • Test group subscriptions

Technical Design

SignalR Hub

[Authorize] // Require JWT for REST API integration
public class McpHub : Hub
{
    private readonly ILogger<McpHub> _logger;

    public McpHub(ILogger<McpHub> logger)
    {
        _logger = logger;
    }

    public override async Task OnConnectedAsync()
    {
        var connectionId = Context.ConnectionId;
        var apiKeyId = Context.Items["ApiKeyId"];
        var tenantId = Context.Items["TenantId"];

        _logger.LogInformation(
            "MCP client connected: {ConnectionId} (ApiKeyId: {ApiKeyId}, TenantId: {TenantId})",
            connectionId, apiKeyId, tenantId);

        await base.OnConnectedAsync();
    }

    public override async Task OnDisconnectedAsync(Exception? exception)
    {
        var connectionId = Context.ConnectionId;

        if (exception != null)
        {
            _logger.LogError(exception,
                "MCP client disconnected with error: {ConnectionId}",
                connectionId);
        }
        else
        {
            _logger.LogInformation(
                "MCP client disconnected: {ConnectionId}",
                connectionId);
        }

        await base.OnDisconnectedAsync(exception);
    }

    public async Task SubscribeToPendingChange(Guid pendingChangeId)
    {
        var groupName = $"pending-change-{pendingChangeId}";
        await Groups.AddToGroupAsync(Context.ConnectionId, groupName);

        _logger.LogInformation(
            "Client {ConnectionId} subscribed to {GroupName}",
            Context.ConnectionId, groupName);
    }

    public async Task UnsubscribeFromPendingChange(Guid pendingChangeId)
    {
        var groupName = $"pending-change-{pendingChangeId}";
        await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);

        _logger.LogInformation(
            "Client {ConnectionId} unsubscribed from {GroupName}",
            Context.ConnectionId, groupName);
    }
}

PendingChangeApproved Notification

public class PendingChangeApprovedEventHandler
    : INotificationHandler<PendingChangeApprovedEvent>
{
    private readonly IHubContext<McpHub> _hubContext;
    private readonly ILogger<PendingChangeApprovedEventHandler> _logger;

    public async Task Handle(PendingChangeApprovedEvent e, CancellationToken ct)
    {
        var groupName = $"pending-change-{e.PendingChangeId}";

        var notification = new
        {
            PendingChangeId = e.PendingChangeId,
            Status = "Approved",
            ToolName = e.ToolName,
            Operation = e.Diff.Operation,
            EntityType = e.Diff.EntityType,
            EntityId = e.Diff.EntityId,
            ApprovedBy = e.ApprovedBy,
            Timestamp = DateTime.UtcNow
        };

        await _hubContext.Clients
            .Group(groupName)
            .SendAsync("PendingChangeApproved", notification, ct);

        _logger.LogInformation(
            "Sent PendingChangeApproved notification: {PendingChangeId}",
            e.PendingChangeId);
    }
}

PendingChangeRejected Notification

public class PendingChangeRejectedEventHandler
    : INotificationHandler<PendingChangeRejectedEvent>
{
    private readonly IHubContext<McpHub> _hubContext;
    private readonly ILogger<PendingChangeRejectedEventHandler> _logger;

    public async Task Handle(PendingChangeRejectedEvent e, CancellationToken ct)
    {
        var groupName = $"pending-change-{e.PendingChangeId}";

        var notification = new
        {
            PendingChangeId = e.PendingChangeId,
            Status = "Rejected",
            ToolName = e.ToolName,
            Reason = e.Reason,
            RejectedBy = e.RejectedBy,
            Timestamp = DateTime.UtcNow
        };

        await _hubContext.Clients
            .Group(groupName)
            .SendAsync("PendingChangeRejected", notification, ct);

        _logger.LogInformation(
            "Sent PendingChangeRejected notification: {PendingChangeId} - Reason: {Reason}",
            e.PendingChangeId, e.Reason);
    }
}

API Key Authentication for SignalR

public class McpApiKeyAuthHandler : AuthenticationHandler<AuthenticationSchemeOptions>
{
    private readonly IMcpApiKeyService _apiKeyService;

    protected override async Task<AuthenticateResult> HandleAuthenticateAsync()
    {
        // For SignalR, API Key may be in query string
        var apiKey = Request.Query["access_token"].FirstOrDefault();

        if (string.IsNullOrEmpty(apiKey))
        {
            return AuthenticateResult.Fail("Missing API Key");
        }

        var validationResult = await _apiKeyService.ValidateAsync(apiKey);
        if (!validationResult.IsValid)
        {
            return AuthenticateResult.Fail(validationResult.ErrorMessage);
        }

        // Store in HttpContext.Items for Hub access
        Context.Items["TenantId"] = validationResult.TenantId;
        Context.Items["ApiKeyId"] = validationResult.ApiKeyId;

        var claims = new[]
        {
            new Claim("TenantId", validationResult.TenantId.ToString()),
            new Claim("ApiKeyId", validationResult.ApiKeyId.ToString())
        };

        var identity = new ClaimsIdentity(claims, "ApiKey");
        var principal = new ClaimsPrincipal(identity);
        var ticket = new AuthenticationTicket(principal, "ApiKey");

        return AuthenticateResult.Success(ticket);
    }
}

AI Client Example (Pseudo-code)

// AI Client (Claude Desktop / ChatGPT)
const connection = new HubConnection("https://colaflow.com/hubs/mcp", {
  accessTokenFactory: () => apiKey
});

await connection.start();

// Subscribe to pending change notifications
await connection.invoke("SubscribeToPendingChange", pendingChangeId);

// Listen for approval
connection.on("PendingChangeApproved", (notification) => {
  console.log("Change approved:", notification);
  console.log("EntityId:", notification.EntityId);
  // Continue AI workflow...
});

// Listen for rejection
connection.on("PendingChangeRejected", (notification) => {
  console.log("Change rejected:", notification.Reason);
  // Adjust AI behavior...
});

Tasks

Task 1: Create McpHub (2 hours)

  • Create McpHub class (inherit from Hub)
  • Implement SubscribeToPendingChange() method
  • Implement UnsubscribeFromPendingChange() method
  • Add connection lifecycle logging

Files to Create:

  • ColaFlow.Modules.Mcp/Hubs/McpHub.cs

Task 2: API Key Authentication for SignalR (3 hours)

  • Update McpApiKeyMiddleware to support query string API Key
  • Add API Key to HttpContext.Items for Hub access
  • Test SignalR connection with API Key

Files to Modify:

  • ColaFlow.Modules.Mcp/Middleware/McpApiKeyMiddleware.cs

Task 3: PendingChangeApproved Event Handler (2 hours)

  • Create PendingChangeApprovedEventHandler
  • Use IHubContext<McpHub> to send notification
  • Target specific group (pending-change-{id})
  • Log notification sent

Files to Create:

  • ColaFlow.Modules.Mcp/EventHandlers/PendingChangeApprovedNotificationHandler.cs

Task 4: PendingChangeRejected Event Handler (1 hour)

  • Create PendingChangeRejectedEventHandler
  • Send rejection notification with reason

Files to Create:

  • ColaFlow.Modules.Mcp/EventHandlers/PendingChangeRejectedNotificationHandler.cs

Task 5: PendingChangeExpired Event Handler (1 hour)

  • Create PendingChangeExpiredEventHandler
  • Send expiration notification

Files to Create:

  • ColaFlow.Modules.Mcp/EventHandlers/PendingChangeExpiredNotificationHandler.cs

Task 6: Configure SignalR (1 hour)

  • Register McpHub in Program.cs
  • Configure CORS for SignalR
  • Configure authentication

Files to Modify:

  • ColaFlow.Api/Program.cs

Task 7: Integration Tests (4 hours)

  • Test SignalR connection with API Key
  • Test group subscription/unsubscription
  • Test notification delivery (approval/rejection)
  • Test multiple clients (isolation)

Files to Create:

  • ColaFlow.Modules.Mcp.Tests/Integration/McpHubIntegrationTests.cs

Testing Strategy

Integration Test Example

[Fact]
public async Task ApprovalNotification_SentToSubscribedClient()
{
    // Arrange - Connect SignalR client
    var hubConnection = new HubConnectionBuilder()
        .WithUrl("http://localhost/hubs/mcp?access_token=" + _apiKey)
        .Build();

    await hubConnection.StartAsync();

    // Subscribe to pending change
    await hubConnection.InvokeAsync("SubscribeToPendingChange", _pendingChangeId);

    // Setup listener
    var notificationReceived = new TaskCompletionSource<object>();
    hubConnection.On<object>("PendingChangeApproved", notification =>
    {
        notificationReceived.SetResult(notification);
    });

    // Act - Approve pending change
    await _pendingChangeService.ApproveAsync(_pendingChangeId, _userId, CancellationToken.None);

    // Assert - Notification received within 2 seconds
    var notification = await notificationReceived.Task.WaitAsync(TimeSpan.FromSeconds(2));
    Assert.NotNull(notification);
}

Dependencies

Prerequisites:

  • Story 5.10 (PendingChange Management) - Domain events to listen to
  • M1 SignalR infrastructure (Day 11-17) - BaseHub, authentication

Optional: Not strictly blocking for M2 MVP (can use polling fallback)

Risks & Mitigation

Risk Impact Probability Mitigation
WebSocket not supported Medium Low Fallback to long polling
SignalR connection drops Medium Medium Auto-reconnect, message queue
Scalability issues (many clients) Medium Low Use Redis backplane for multi-instance

Definition of Done

  • McpHub implemented
  • API Key authentication working
  • Approval notification sent within 1 second
  • Rejection notification sent within 1 second
  • Group subscriptions working
  • Integration tests passing
  • Connection handling robust (disconnect/reconnect)
  • Code reviewed

Notes

Why This Story Matters

  • Completes AI Loop: AI can act on approval results immediately
  • Better Performance: WebSocket > polling (10x less overhead)
  • Foundation: Enables future real-time AI features
  • M2 Polish: Demonstrates production-grade implementation

Key Design Decisions

  1. SignalR Groups: Target specific pending changes (not broadcast)
  2. Domain Events: Loosely coupled, easy to add more notifications
  3. API Key Auth: Reuse existing authentication system
  4. Graceful Degradation: Fallback to polling if WebSocket fails

Scalability (Future)

  • Use Redis backplane for multi-instance SignalR
  • Horizontal scaling (multiple app servers)
  • Message queue for guaranteed delivery

SignalR Configuration

// Program.cs
builder.Services.AddSignalR()
    .AddStackExchangeRedis(builder.Configuration.GetConnectionString("Redis"));

app.MapHub<McpHub>("/hubs/mcp");

Reference Materials