Files
ColaFlow/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeCreatedNotificationHandler.cs
Yaojia Wang 9ccd3284fb feat(backend): Implement SignalR Real-Time Notifications for MCP - Story 5.12
Implemented comprehensive real-time notification system using SignalR to notify
AI agents and users about PendingChange status updates.

Key Features Implemented:
- McpNotificationHub with Subscribe/Unsubscribe methods
- Real-time notifications for all PendingChange lifecycle events
- Tenant-based isolation for multi-tenancy security
- Notification DTOs for structured message formats
- Domain event handlers for automatic notification sending
- Comprehensive unit tests for notification service and handlers
- Client integration guide with examples for TypeScript, React, and Python

Components Created:
1. SignalR Hub:
   - McpNotificationHub.cs - Central hub for MCP notifications

2. Notification DTOs:
   - PendingChangeNotification.cs (base class)
   - PendingChangeCreatedNotification.cs
   - PendingChangeApprovedNotification.cs
   - PendingChangeRejectedNotification.cs
   - PendingChangeAppliedNotification.cs
   - PendingChangeExpiredNotification.cs

3. Notification Service:
   - IMcpNotificationService.cs (interface)
   - McpNotificationService.cs (implementation using SignalR)

4. Event Handlers (send notifications):
   - PendingChangeCreatedNotificationHandler.cs
   - PendingChangeApprovedNotificationHandler.cs
   - PendingChangeRejectedNotificationHandler.cs
   - PendingChangeAppliedNotificationHandler.cs
   - PendingChangeExpiredNotificationHandler.cs

5. Tests:
   - McpNotificationServiceTests.cs - Unit tests for notification service
   - PendingChangeCreatedNotificationHandlerTests.cs
   - PendingChangeApprovedNotificationHandlerTests.cs

6. Documentation:
   - signalr-mcp-client-guide.md - Comprehensive client integration guide

Technical Details:
- Hub endpoint: /hubs/mcp-notifications
- Authentication: JWT token via query string (?access_token=xxx)
- Tenant isolation: Automatic group joining based on tenant ID
- Group subscriptions: Per-pending-change and per-tenant groups
- Notification delivery: < 1 second (real-time)
- Fallback strategy: Polling if WebSocket unavailable

Architecture Benefits:
- Decoupled design using domain events
- Notification failures don't break main flow
- Scalable (supports Redis backplane for multi-instance)
- Type-safe notification payloads
- Tenant isolation built-in

Story: Phase 3 - Tools & Diff Preview
Priority: P0 CRITICAL
Story Points: 3
Completion: 100%

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-09 18:21:08 +01:00

77 lines
3.3 KiB
C#

using ColaFlow.Modules.Mcp.Application.DTOs.Notifications;
using ColaFlow.Modules.Mcp.Application.Services;
using ColaFlow.Modules.Mcp.Domain.Entities;
using ColaFlow.Modules.Mcp.Domain.Events;
using ColaFlow.Modules.Mcp.Domain.Repositories;
using MediatR;
using Microsoft.Extensions.Logging;
namespace ColaFlow.Modules.Mcp.Application.EventHandlers;
/// <summary>
/// Event handler that sends SignalR notifications when a PendingChange is created
/// </summary>
public class PendingChangeCreatedNotificationHandler : INotificationHandler<PendingChangeCreatedEvent>
{
private readonly IMcpNotificationService _notificationService;
private readonly IPendingChangeRepository _repository;
private readonly ILogger<PendingChangeCreatedNotificationHandler> _logger;
public PendingChangeCreatedNotificationHandler(
IMcpNotificationService notificationService,
IPendingChangeRepository repository,
ILogger<PendingChangeCreatedNotificationHandler> logger)
{
_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task Handle(PendingChangeCreatedEvent notification, CancellationToken cancellationToken)
{
_logger.LogInformation(
"Handling PendingChangeCreatedEvent - PendingChangeId={PendingChangeId}, EntityType={EntityType}, Operation={Operation}",
notification.PendingChangeId, notification.EntityType, notification.Operation);
try
{
// Get PendingChange for summary
var pendingChange = await _repository.GetByIdAsync(notification.PendingChangeId, cancellationToken);
if (pendingChange == null)
{
_logger.LogWarning(
"PendingChange not found - PendingChangeId={PendingChangeId}",
notification.PendingChangeId);
return;
}
// Create notification DTO
var notificationDto = new PendingChangeCreatedNotification
{
NotificationType = "PendingChangeCreated",
PendingChangeId = notification.PendingChangeId,
ToolName = notification.ToolName,
EntityType = notification.EntityType,
Operation = notification.Operation,
Summary = pendingChange.GetSummary(),
TenantId = notification.TenantId,
Timestamp = DateTime.UtcNow
};
// Send notification via SignalR
await _notificationService.NotifyPendingChangeCreatedAsync(notificationDto, cancellationToken);
_logger.LogInformation(
"PendingChangeCreated notification sent successfully - PendingChangeId={PendingChangeId}",
notification.PendingChangeId);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to send PendingChangeCreated notification - PendingChangeId={PendingChangeId}",
notification.PendingChangeId);
// Don't rethrow - notification failure shouldn't break the main flow
}
}
}