feat(backend): Implement MCP Protocol Handler (Story 5.1)

Implemented JSON-RPC 2.0 protocol handler for MCP communication, enabling AI agents to communicate with ColaFlow using the Model Context Protocol.

**Implementation:**
- JSON-RPC 2.0 data models (Request, Response, Error, ErrorCode)
- MCP protocol models (Initialize, Capabilities, ClientInfo, ServerInfo)
- McpProtocolHandler with method routing and error handling
- Method handlers: initialize, resources/list, tools/list, tools/call
- ASP.NET Core middleware for /mcp endpoint
- Service registration and dependency injection setup

**Testing:**
- 28 unit tests covering protocol parsing, validation, and error handling
- Integration tests for initialize handshake and error responses
- All tests passing with >80% coverage

**Changes:**
- Created ColaFlow.Modules.Mcp.Contracts project
- Created ColaFlow.Modules.Mcp.Domain project
- Created ColaFlow.Modules.Mcp.Application project
- Created ColaFlow.Modules.Mcp.Infrastructure project
- Created ColaFlow.Modules.Mcp.Tests project
- Registered MCP module in ColaFlow.API Program.cs
- Added /mcp endpoint via middleware

**Acceptance Criteria Met:**
 JSON-RPC 2.0 messages correctly parsed
 Request validation (jsonrpc: "2.0", method, params, id)
 Error responses conform to JSON-RPC 2.0 spec
 Invalid requests return proper error codes (-32700, -32600, -32601, -32602)
 MCP initialize method implemented
 Server capabilities returned (resources, tools, prompts)
 Protocol version negotiation works (1.0)
 Request routing to method handlers
 Unit test coverage > 80%
 All tests passing

**Story**: docs/stories/sprint_5/story_5_1.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Yaojia Wang
2025-11-07 19:38:34 +01:00
parent d3ef2c1441
commit 48a8431e4f
43 changed files with 7003 additions and 0 deletions

View File

@@ -0,0 +1,429 @@
---
story_id: story_5_12
sprint_id: sprint_5
phase: Phase 3 - Tools & Diff Preview
status: not_started
priority: P0
story_points: 3
assignee: backend
estimated_days: 1
created_date: 2025-11-06
dependencies: [story_5_10]
---
# Story 5.12: SignalR Real-Time Notifications
**Phase**: Phase 3 - Tools & Diff Preview (Week 5-6)
**Priority**: P0 CRITICAL
**Estimated Effort**: 3 Story Points (1 day)
## User Story
**As an** AI Agent
**I want** to receive real-time notifications when my pending changes are approved or rejected
**So that** I can continue my workflow without polling
## Business Value
Real-time notifications complete the AI interaction loop:
- **Faster Feedback**: AI knows approval result within 1 second
- **Better UX**: No polling, immediate response
- **Scalability**: WebSocket more efficient than polling
- **Future-Proof**: Foundation for other real-time AI features
## Acceptance Criteria
### AC1: McpHub SignalR Hub
- [ ] Create `McpHub` SignalR hub
- [ ] Support `SubscribeToPendingChange(pendingChangeId)` method
- [ ] Support `UnsubscribeFromPendingChange(pendingChangeId)` method
- [ ] Authenticate connections via API Key
### AC2: Approval Notification
- [ ] When PendingChange approved → push notification to AI
- [ ] Notification includes: pendingChangeId, status, execution result
- [ ] Delivered within 1 second of approval
- [ ] Use SignalR groups for targeting
### AC3: Rejection Notification
- [ ] When PendingChange rejected → push notification to AI
- [ ] Notification includes: pendingChangeId, status, rejection reason
- [ ] Delivered within 1 second of rejection
### AC4: Expiration Notification
- [ ] When PendingChange expired → push notification to AI
- [ ] Notification includes: pendingChangeId, status
### AC5: Connection Management
- [ ] API Key authentication for SignalR connections
- [ ] Handle disconnections gracefully
- [ ] Reconnection support
- [ ] Fallback to polling if WebSocket unavailable
### AC6: Testing
- [ ] Unit tests for event handlers
- [ ] Integration tests for SignalR notifications
- [ ] Test connection authentication
- [ ] Test group subscriptions
## Technical Design
### SignalR Hub
```csharp
[Authorize] // Require JWT for REST API integration
public class McpHub : Hub
{
private readonly ILogger<McpHub> _logger;
public McpHub(ILogger<McpHub> logger)
{
_logger = logger;
}
public override async Task OnConnectedAsync()
{
var connectionId = Context.ConnectionId;
var apiKeyId = Context.Items["ApiKeyId"];
var tenantId = Context.Items["TenantId"];
_logger.LogInformation(
"MCP client connected: {ConnectionId} (ApiKeyId: {ApiKeyId}, TenantId: {TenantId})",
connectionId, apiKeyId, tenantId);
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
var connectionId = Context.ConnectionId;
if (exception != null)
{
_logger.LogError(exception,
"MCP client disconnected with error: {ConnectionId}",
connectionId);
}
else
{
_logger.LogInformation(
"MCP client disconnected: {ConnectionId}",
connectionId);
}
await base.OnDisconnectedAsync(exception);
}
public async Task SubscribeToPendingChange(Guid pendingChangeId)
{
var groupName = $"pending-change-{pendingChangeId}";
await Groups.AddToGroupAsync(Context.ConnectionId, groupName);
_logger.LogInformation(
"Client {ConnectionId} subscribed to {GroupName}",
Context.ConnectionId, groupName);
}
public async Task UnsubscribeFromPendingChange(Guid pendingChangeId)
{
var groupName = $"pending-change-{pendingChangeId}";
await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);
_logger.LogInformation(
"Client {ConnectionId} unsubscribed from {GroupName}",
Context.ConnectionId, groupName);
}
}
```
### PendingChangeApproved Notification
```csharp
public class PendingChangeApprovedEventHandler
: INotificationHandler<PendingChangeApprovedEvent>
{
private readonly IHubContext<McpHub> _hubContext;
private readonly ILogger<PendingChangeApprovedEventHandler> _logger;
public async Task Handle(PendingChangeApprovedEvent e, CancellationToken ct)
{
var groupName = $"pending-change-{e.PendingChangeId}";
var notification = new
{
PendingChangeId = e.PendingChangeId,
Status = "Approved",
ToolName = e.ToolName,
Operation = e.Diff.Operation,
EntityType = e.Diff.EntityType,
EntityId = e.Diff.EntityId,
ApprovedBy = e.ApprovedBy,
Timestamp = DateTime.UtcNow
};
await _hubContext.Clients
.Group(groupName)
.SendAsync("PendingChangeApproved", notification, ct);
_logger.LogInformation(
"Sent PendingChangeApproved notification: {PendingChangeId}",
e.PendingChangeId);
}
}
```
### PendingChangeRejected Notification
```csharp
public class PendingChangeRejectedEventHandler
: INotificationHandler<PendingChangeRejectedEvent>
{
private readonly IHubContext<McpHub> _hubContext;
private readonly ILogger<PendingChangeRejectedEventHandler> _logger;
public async Task Handle(PendingChangeRejectedEvent e, CancellationToken ct)
{
var groupName = $"pending-change-{e.PendingChangeId}";
var notification = new
{
PendingChangeId = e.PendingChangeId,
Status = "Rejected",
ToolName = e.ToolName,
Reason = e.Reason,
RejectedBy = e.RejectedBy,
Timestamp = DateTime.UtcNow
};
await _hubContext.Clients
.Group(groupName)
.SendAsync("PendingChangeRejected", notification, ct);
_logger.LogInformation(
"Sent PendingChangeRejected notification: {PendingChangeId} - Reason: {Reason}",
e.PendingChangeId, e.Reason);
}
}
```
### API Key Authentication for SignalR
```csharp
public class McpApiKeyAuthHandler : AuthenticationHandler<AuthenticationSchemeOptions>
{
private readonly IMcpApiKeyService _apiKeyService;
protected override async Task<AuthenticateResult> HandleAuthenticateAsync()
{
// For SignalR, API Key may be in query string
var apiKey = Request.Query["access_token"].FirstOrDefault();
if (string.IsNullOrEmpty(apiKey))
{
return AuthenticateResult.Fail("Missing API Key");
}
var validationResult = await _apiKeyService.ValidateAsync(apiKey);
if (!validationResult.IsValid)
{
return AuthenticateResult.Fail(validationResult.ErrorMessage);
}
// Store in HttpContext.Items for Hub access
Context.Items["TenantId"] = validationResult.TenantId;
Context.Items["ApiKeyId"] = validationResult.ApiKeyId;
var claims = new[]
{
new Claim("TenantId", validationResult.TenantId.ToString()),
new Claim("ApiKeyId", validationResult.ApiKeyId.ToString())
};
var identity = new ClaimsIdentity(claims, "ApiKey");
var principal = new ClaimsPrincipal(identity);
var ticket = new AuthenticationTicket(principal, "ApiKey");
return AuthenticateResult.Success(ticket);
}
}
```
### AI Client Example (Pseudo-code)
```typescript
// AI Client (Claude Desktop / ChatGPT)
const connection = new HubConnection("https://colaflow.com/hubs/mcp", {
accessTokenFactory: () => apiKey
});
await connection.start();
// Subscribe to pending change notifications
await connection.invoke("SubscribeToPendingChange", pendingChangeId);
// Listen for approval
connection.on("PendingChangeApproved", (notification) => {
console.log("Change approved:", notification);
console.log("EntityId:", notification.EntityId);
// Continue AI workflow...
});
// Listen for rejection
connection.on("PendingChangeRejected", (notification) => {
console.log("Change rejected:", notification.Reason);
// Adjust AI behavior...
});
```
## Tasks
### Task 1: Create McpHub (2 hours)
- [ ] Create `McpHub` class (inherit from `Hub`)
- [ ] Implement `SubscribeToPendingChange()` method
- [ ] Implement `UnsubscribeFromPendingChange()` method
- [ ] Add connection lifecycle logging
**Files to Create**:
- `ColaFlow.Modules.Mcp/Hubs/McpHub.cs`
### Task 2: API Key Authentication for SignalR (3 hours)
- [ ] Update `McpApiKeyMiddleware` to support query string API Key
- [ ] Add API Key to `HttpContext.Items` for Hub access
- [ ] Test SignalR connection with API Key
**Files to Modify**:
- `ColaFlow.Modules.Mcp/Middleware/McpApiKeyMiddleware.cs`
### Task 3: PendingChangeApproved Event Handler (2 hours)
- [ ] Create `PendingChangeApprovedEventHandler`
- [ ] Use `IHubContext<McpHub>` to send notification
- [ ] Target specific group (`pending-change-{id}`)
- [ ] Log notification sent
**Files to Create**:
- `ColaFlow.Modules.Mcp/EventHandlers/PendingChangeApprovedNotificationHandler.cs`
### Task 4: PendingChangeRejected Event Handler (1 hour)
- [ ] Create `PendingChangeRejectedEventHandler`
- [ ] Send rejection notification with reason
**Files to Create**:
- `ColaFlow.Modules.Mcp/EventHandlers/PendingChangeRejectedNotificationHandler.cs`
### Task 5: PendingChangeExpired Event Handler (1 hour)
- [ ] Create `PendingChangeExpiredEventHandler`
- [ ] Send expiration notification
**Files to Create**:
- `ColaFlow.Modules.Mcp/EventHandlers/PendingChangeExpiredNotificationHandler.cs`
### Task 6: Configure SignalR (1 hour)
- [ ] Register `McpHub` in `Program.cs`
- [ ] Configure CORS for SignalR
- [ ] Configure authentication
**Files to Modify**:
- `ColaFlow.Api/Program.cs`
### Task 7: Integration Tests (4 hours)
- [ ] Test SignalR connection with API Key
- [ ] Test group subscription/unsubscription
- [ ] Test notification delivery (approval/rejection)
- [ ] Test multiple clients (isolation)
**Files to Create**:
- `ColaFlow.Modules.Mcp.Tests/Integration/McpHubIntegrationTests.cs`
## Testing Strategy
### Integration Test Example
```csharp
[Fact]
public async Task ApprovalNotification_SentToSubscribedClient()
{
// Arrange - Connect SignalR client
var hubConnection = new HubConnectionBuilder()
.WithUrl("http://localhost/hubs/mcp?access_token=" + _apiKey)
.Build();
await hubConnection.StartAsync();
// Subscribe to pending change
await hubConnection.InvokeAsync("SubscribeToPendingChange", _pendingChangeId);
// Setup listener
var notificationReceived = new TaskCompletionSource<object>();
hubConnection.On<object>("PendingChangeApproved", notification =>
{
notificationReceived.SetResult(notification);
});
// Act - Approve pending change
await _pendingChangeService.ApproveAsync(_pendingChangeId, _userId, CancellationToken.None);
// Assert - Notification received within 2 seconds
var notification = await notificationReceived.Task.WaitAsync(TimeSpan.FromSeconds(2));
Assert.NotNull(notification);
}
```
## Dependencies
**Prerequisites**:
- Story 5.10 (PendingChange Management) - Domain events to listen to
- M1 SignalR infrastructure (Day 11-17) - BaseHub, authentication
**Optional**: Not strictly blocking for M2 MVP (can use polling fallback)
## Risks & Mitigation
| Risk | Impact | Probability | Mitigation |
|------|--------|-------------|------------|
| WebSocket not supported | Medium | Low | Fallback to long polling |
| SignalR connection drops | Medium | Medium | Auto-reconnect, message queue |
| Scalability issues (many clients) | Medium | Low | Use Redis backplane for multi-instance |
## Definition of Done
- [ ] McpHub implemented
- [ ] API Key authentication working
- [ ] Approval notification sent within 1 second
- [ ] Rejection notification sent within 1 second
- [ ] Group subscriptions working
- [ ] Integration tests passing
- [ ] Connection handling robust (disconnect/reconnect)
- [ ] Code reviewed
## Notes
### Why This Story Matters
- **Completes AI Loop**: AI can act on approval results immediately
- **Better Performance**: WebSocket > polling (10x less overhead)
- **Foundation**: Enables future real-time AI features
- **M2 Polish**: Demonstrates production-grade implementation
### Key Design Decisions
1. **SignalR Groups**: Target specific pending changes (not broadcast)
2. **Domain Events**: Loosely coupled, easy to add more notifications
3. **API Key Auth**: Reuse existing authentication system
4. **Graceful Degradation**: Fallback to polling if WebSocket fails
### Scalability (Future)
- Use Redis backplane for multi-instance SignalR
- Horizontal scaling (multiple app servers)
- Message queue for guaranteed delivery
### SignalR Configuration
```csharp
// Program.cs
builder.Services.AddSignalR()
.AddStackExchangeRedis(builder.Configuration.GetConnectionString("Redis"));
app.MapHub<McpHub>("/hubs/mcp");
```
## Reference Materials
- SignalR Documentation: https://learn.microsoft.com/en-us/aspnet/core/signalr/introduction
- Sprint 5 Plan: `docs/plans/sprint_5.md`