diff --git a/colaflow-api/src/ColaFlow.API/Hubs/McpNotificationHub.cs b/colaflow-api/src/ColaFlow.API/Hubs/McpNotificationHub.cs new file mode 100644 index 0000000..593ee57 --- /dev/null +++ b/colaflow-api/src/ColaFlow.API/Hubs/McpNotificationHub.cs @@ -0,0 +1,90 @@ +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.SignalR; + +namespace ColaFlow.API.Hubs; + +/// +/// SignalR Hub for MCP real-time notifications +/// Supports notifying AI agents and users about PendingChange status updates +/// +[Authorize] +public class McpNotificationHub : BaseHub +{ + private readonly ILogger _logger; + + public McpNotificationHub(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public override async Task OnConnectedAsync() + { + var connectionId = Context.ConnectionId; + var userId = GetCurrentUserId(); + var tenantId = GetCurrentTenantId(); + + _logger.LogInformation( + "MCP client connected - ConnectionId={ConnectionId}, UserId={UserId}, TenantId={TenantId}", + connectionId, userId, tenantId); + + await base.OnConnectedAsync(); + } + + public override async Task OnDisconnectedAsync(Exception? exception) + { + var connectionId = Context.ConnectionId; + + if (exception != null) + { + _logger.LogError(exception, + "MCP client disconnected with error - ConnectionId={ConnectionId}", + connectionId); + } + else + { + _logger.LogInformation( + "MCP client disconnected - ConnectionId={ConnectionId}", + connectionId); + } + + await base.OnDisconnectedAsync(exception); + } + + /// + /// Subscribe to receive notifications for a specific pending change + /// + /// The pending change ID to subscribe to + public async Task SubscribeToPendingChange(Guid pendingChangeId) + { + var groupName = GetPendingChangeGroupName(pendingChangeId); + await Groups.AddToGroupAsync(Context.ConnectionId, groupName); + + _logger.LogInformation( + "Client subscribed - ConnectionId={ConnectionId}, GroupName={GroupName}, PendingChangeId={PendingChangeId}", + Context.ConnectionId, groupName, pendingChangeId); + } + + /// + /// Unsubscribe from receiving notifications for a specific pending change + /// + /// The pending change ID to unsubscribe from + public async Task UnsubscribeFromPendingChange(Guid pendingChangeId) + { + var groupName = GetPendingChangeGroupName(pendingChangeId); + await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName); + + _logger.LogInformation( + "Client unsubscribed - ConnectionId={ConnectionId}, GroupName={GroupName}, PendingChangeId={PendingChangeId}", + Context.ConnectionId, groupName, pendingChangeId); + } + + /// + /// Get the SignalR group name for a pending change + /// + /// The pending change ID + /// The group name + private static string GetPendingChangeGroupName(Guid pendingChangeId) + { + return $"pending-change-{pendingChangeId}"; + } +} diff --git a/colaflow-api/src/ColaFlow.API/Program.cs b/colaflow-api/src/ColaFlow.API/Program.cs index bd031e5..16cb7ad 100644 --- a/colaflow-api/src/ColaFlow.API/Program.cs +++ b/colaflow-api/src/ColaFlow.API/Program.cs @@ -225,6 +225,7 @@ app.MapHealthChecks("/health"); // Map SignalR Hubs (after UseAuthorization) app.MapHub("/hubs/project"); app.MapHub("/hubs/notification"); +app.MapHub("/hubs/mcp-notifications"); // ============================================ // Auto-migrate databases in development diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeAppliedNotification.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeAppliedNotification.cs new file mode 100644 index 0000000..bd84165 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeAppliedNotification.cs @@ -0,0 +1,18 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +/// +/// Notification sent when a PendingChange has been successfully applied +/// (after approval and execution) +/// +public sealed record PendingChangeAppliedNotification : PendingChangeNotification +{ + /// + /// Result of applying the change + /// + public required string Result { get; init; } + + /// + /// When the change was applied (UTC) + /// + public required DateTime AppliedAt { get; init; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeApprovedNotification.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeApprovedNotification.cs new file mode 100644 index 0000000..4f3ea74 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeApprovedNotification.cs @@ -0,0 +1,32 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +/// +/// Notification sent when a PendingChange is approved and executed +/// +public sealed record PendingChangeApprovedNotification : PendingChangeNotification +{ + /// + /// Type of entity that was changed + /// + public required string EntityType { get; init; } + + /// + /// Operation that was performed + /// + public required string Operation { get; init; } + + /// + /// ID of the entity that was created/updated (if applicable) + /// + public Guid? EntityId { get; init; } + + /// + /// ID of the user who approved the change + /// + public required Guid ApprovedBy { get; init; } + + /// + /// Result of executing the change (e.g., "Epic created: {id} - {name}") + /// + public string? ExecutionResult { get; init; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeCreatedNotification.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeCreatedNotification.cs new file mode 100644 index 0000000..e6547e3 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeCreatedNotification.cs @@ -0,0 +1,22 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +/// +/// Notification sent when a new PendingChange is created +/// +public sealed record PendingChangeCreatedNotification : PendingChangeNotification +{ + /// + /// Type of entity being changed (Epic, Story, Task, etc.) + /// + public required string EntityType { get; init; } + + /// + /// Operation being performed (CREATE, UPDATE, DELETE) + /// + public required string Operation { get; init; } + + /// + /// Summary of what will be changed + /// + public required string Summary { get; init; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeExpiredNotification.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeExpiredNotification.cs new file mode 100644 index 0000000..b7f21d5 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeExpiredNotification.cs @@ -0,0 +1,12 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +/// +/// Notification sent when a PendingChange expires (timeout) +/// +public sealed record PendingChangeExpiredNotification : PendingChangeNotification +{ + /// + /// When the pending change expired (UTC) + /// + public required DateTime ExpiredAt { get; init; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeNotification.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeNotification.cs new file mode 100644 index 0000000..4d62387 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeNotification.cs @@ -0,0 +1,32 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +/// +/// Base class for all PendingChange notifications +/// +public abstract record PendingChangeNotification +{ + /// + /// Type of notification (PendingChangeCreated, PendingChangeApproved, etc.) + /// + public required string NotificationType { get; init; } + + /// + /// The ID of the pending change + /// + public required Guid PendingChangeId { get; init; } + + /// + /// The tool that created the pending change + /// + public required string ToolName { get; init; } + + /// + /// When this notification was generated (UTC) + /// + public DateTime Timestamp { get; init; } = DateTime.UtcNow; + + /// + /// Tenant ID for multi-tenancy support + /// + public required Guid TenantId { get; init; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeRejectedNotification.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeRejectedNotification.cs new file mode 100644 index 0000000..bb69d32 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/Notifications/PendingChangeRejectedNotification.cs @@ -0,0 +1,17 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +/// +/// Notification sent when a PendingChange is rejected +/// +public sealed record PendingChangeRejectedNotification : PendingChangeNotification +{ + /// + /// Reason for rejection + /// + public required string Reason { get; init; } + + /// + /// ID of the user who rejected the change + /// + public required Guid RejectedBy { get; init; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeAppliedNotificationHandler.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeAppliedNotificationHandler.cs new file mode 100644 index 0000000..487ba0a --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeAppliedNotificationHandler.cs @@ -0,0 +1,60 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Events; +using MediatR; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Application.EventHandlers; + +/// +/// Event handler that sends SignalR notifications when a PendingChange is applied +/// +public class PendingChangeAppliedNotificationHandler : INotificationHandler +{ + private readonly IMcpNotificationService _notificationService; + private readonly ILogger _logger; + + public PendingChangeAppliedNotificationHandler( + IMcpNotificationService notificationService, + ILogger logger) + { + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task Handle(PendingChangeAppliedEvent notification, CancellationToken cancellationToken) + { + _logger.LogInformation( + "Handling PendingChangeAppliedEvent for notification - PendingChangeId={PendingChangeId}, Result={Result}", + notification.PendingChangeId, notification.Result); + + try + { + // Create notification DTO + var notificationDto = new PendingChangeAppliedNotification + { + NotificationType = "PendingChangeApplied", + PendingChangeId = notification.PendingChangeId, + ToolName = notification.ToolName, + Result = notification.Result, + AppliedAt = DateTime.UtcNow, + TenantId = notification.TenantId, + Timestamp = DateTime.UtcNow + }; + + // Send notification via SignalR + await _notificationService.NotifyPendingChangeAppliedAsync(notificationDto, cancellationToken); + + _logger.LogInformation( + "PendingChangeApplied notification sent successfully - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to send PendingChangeApplied notification - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + // Don't rethrow - notification failure shouldn't break the main flow + } + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedNotificationHandler.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedNotificationHandler.cs new file mode 100644 index 0000000..5944d09 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedNotificationHandler.cs @@ -0,0 +1,63 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Events; +using MediatR; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Application.EventHandlers; + +/// +/// Event handler that sends SignalR notifications when a PendingChange is approved +/// Runs in parallel with PendingChangeApprovedEventHandler (which executes the change) +/// +public class PendingChangeApprovedNotificationHandler : INotificationHandler +{ + private readonly IMcpNotificationService _notificationService; + private readonly ILogger _logger; + + public PendingChangeApprovedNotificationHandler( + IMcpNotificationService notificationService, + ILogger logger) + { + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task Handle(PendingChangeApprovedEvent notification, CancellationToken cancellationToken) + { + _logger.LogInformation( + "Handling PendingChangeApprovedEvent for notification - PendingChangeId={PendingChangeId}, EntityType={EntityType}", + notification.PendingChangeId, notification.Diff.EntityType); + + try + { + // Create notification DTO + var notificationDto = new PendingChangeApprovedNotification + { + NotificationType = "PendingChangeApproved", + PendingChangeId = notification.PendingChangeId, + ToolName = notification.ToolName, + EntityType = notification.Diff.EntityType, + Operation = notification.Diff.Operation, + EntityId = notification.Diff.EntityId, + ApprovedBy = notification.ApprovedBy, + TenantId = notification.TenantId, + Timestamp = DateTime.UtcNow + }; + + // Send notification via SignalR + await _notificationService.NotifyPendingChangeApprovedAsync(notificationDto, cancellationToken); + + _logger.LogInformation( + "PendingChangeApproved notification sent successfully - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to send PendingChangeApproved notification - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + // Don't rethrow - notification failure shouldn't break the main flow + } + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeCreatedNotificationHandler.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeCreatedNotificationHandler.cs new file mode 100644 index 0000000..26bdc38 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeCreatedNotificationHandler.cs @@ -0,0 +1,76 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Entities; +using ColaFlow.Modules.Mcp.Domain.Events; +using ColaFlow.Modules.Mcp.Domain.Repositories; +using MediatR; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Application.EventHandlers; + +/// +/// Event handler that sends SignalR notifications when a PendingChange is created +/// +public class PendingChangeCreatedNotificationHandler : INotificationHandler +{ + private readonly IMcpNotificationService _notificationService; + private readonly IPendingChangeRepository _repository; + private readonly ILogger _logger; + + public PendingChangeCreatedNotificationHandler( + IMcpNotificationService notificationService, + IPendingChangeRepository repository, + ILogger logger) + { + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _repository = repository ?? throw new ArgumentNullException(nameof(repository)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task Handle(PendingChangeCreatedEvent notification, CancellationToken cancellationToken) + { + _logger.LogInformation( + "Handling PendingChangeCreatedEvent - PendingChangeId={PendingChangeId}, EntityType={EntityType}, Operation={Operation}", + notification.PendingChangeId, notification.EntityType, notification.Operation); + + try + { + // Get PendingChange for summary + var pendingChange = await _repository.GetByIdAsync(notification.PendingChangeId, cancellationToken); + if (pendingChange == null) + { + _logger.LogWarning( + "PendingChange not found - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + return; + } + + // Create notification DTO + var notificationDto = new PendingChangeCreatedNotification + { + NotificationType = "PendingChangeCreated", + PendingChangeId = notification.PendingChangeId, + ToolName = notification.ToolName, + EntityType = notification.EntityType, + Operation = notification.Operation, + Summary = pendingChange.GetSummary(), + TenantId = notification.TenantId, + Timestamp = DateTime.UtcNow + }; + + // Send notification via SignalR + await _notificationService.NotifyPendingChangeCreatedAsync(notificationDto, cancellationToken); + + _logger.LogInformation( + "PendingChangeCreated notification sent successfully - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to send PendingChangeCreated notification - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + // Don't rethrow - notification failure shouldn't break the main flow + } + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeExpiredNotificationHandler.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeExpiredNotificationHandler.cs new file mode 100644 index 0000000..cc14d54 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeExpiredNotificationHandler.cs @@ -0,0 +1,59 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Events; +using MediatR; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Application.EventHandlers; + +/// +/// Event handler that sends SignalR notifications when a PendingChange expires +/// +public class PendingChangeExpiredNotificationHandler : INotificationHandler +{ + private readonly IMcpNotificationService _notificationService; + private readonly ILogger _logger; + + public PendingChangeExpiredNotificationHandler( + IMcpNotificationService notificationService, + ILogger logger) + { + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task Handle(PendingChangeExpiredEvent notification, CancellationToken cancellationToken) + { + _logger.LogInformation( + "Handling PendingChangeExpiredEvent for notification - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + + try + { + // Create notification DTO + var notificationDto = new PendingChangeExpiredNotification + { + NotificationType = "PendingChangeExpired", + PendingChangeId = notification.PendingChangeId, + ToolName = notification.ToolName, + TenantId = notification.TenantId, + ExpiredAt = DateTime.UtcNow, + Timestamp = DateTime.UtcNow + }; + + // Send notification via SignalR + await _notificationService.NotifyPendingChangeExpiredAsync(notificationDto, cancellationToken); + + _logger.LogInformation( + "PendingChangeExpired notification sent successfully - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to send PendingChangeExpired notification - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + // Don't rethrow - notification failure shouldn't break the main flow + } + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeRejectedNotificationHandler.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeRejectedNotificationHandler.cs new file mode 100644 index 0000000..edad489 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeRejectedNotificationHandler.cs @@ -0,0 +1,60 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Events; +using MediatR; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Application.EventHandlers; + +/// +/// Event handler that sends SignalR notifications when a PendingChange is rejected +/// +public class PendingChangeRejectedNotificationHandler : INotificationHandler +{ + private readonly IMcpNotificationService _notificationService; + private readonly ILogger _logger; + + public PendingChangeRejectedNotificationHandler( + IMcpNotificationService notificationService, + ILogger logger) + { + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task Handle(PendingChangeRejectedEvent notification, CancellationToken cancellationToken) + { + _logger.LogInformation( + "Handling PendingChangeRejectedEvent for notification - PendingChangeId={PendingChangeId}, Reason={Reason}", + notification.PendingChangeId, notification.Reason); + + try + { + // Create notification DTO + var notificationDto = new PendingChangeRejectedNotification + { + NotificationType = "PendingChangeRejected", + PendingChangeId = notification.PendingChangeId, + ToolName = notification.ToolName, + Reason = notification.Reason, + RejectedBy = notification.RejectedBy, + TenantId = notification.TenantId, + Timestamp = DateTime.UtcNow + }; + + // Send notification via SignalR + await _notificationService.NotifyPendingChangeRejectedAsync(notificationDto, cancellationToken); + + _logger.LogInformation( + "PendingChangeRejected notification sent successfully - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to send PendingChangeRejected notification - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + // Don't rethrow - notification failure shouldn't break the main flow + } + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IMcpNotificationService.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IMcpNotificationService.cs new file mode 100644 index 0000000..9908d40 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IMcpNotificationService.cs @@ -0,0 +1,44 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; + +namespace ColaFlow.Modules.Mcp.Application.Services; + +/// +/// Service for sending real-time notifications to MCP clients via SignalR +/// +public interface IMcpNotificationService +{ + /// + /// Notify that a new PendingChange was created + /// + Task NotifyPendingChangeCreatedAsync( + PendingChangeCreatedNotification notification, + CancellationToken cancellationToken = default); + + /// + /// Notify that a PendingChange was approved + /// + Task NotifyPendingChangeApprovedAsync( + PendingChangeApprovedNotification notification, + CancellationToken cancellationToken = default); + + /// + /// Notify that a PendingChange was rejected + /// + Task NotifyPendingChangeRejectedAsync( + PendingChangeRejectedNotification notification, + CancellationToken cancellationToken = default); + + /// + /// Notify that a PendingChange was applied successfully + /// + Task NotifyPendingChangeAppliedAsync( + PendingChangeAppliedNotification notification, + CancellationToken cancellationToken = default); + + /// + /// Notify that a PendingChange expired + /// + Task NotifyPendingChangeExpiredAsync( + PendingChangeExpiredNotification notification, + CancellationToken cancellationToken = default); +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs index bb92a3d..4abc090 100644 --- a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs @@ -7,6 +7,7 @@ using ColaFlow.Modules.Mcp.Infrastructure.BackgroundServices; using ColaFlow.Modules.Mcp.Infrastructure.Middleware; using ColaFlow.Modules.Mcp.Infrastructure.Persistence; using ColaFlow.Modules.Mcp.Infrastructure.Persistence.Repositories; +using ColaFlow.Modules.Mcp.Infrastructure.Services; using Microsoft.AspNetCore.Builder; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; @@ -44,6 +45,9 @@ public static class McpServiceExtensions services.AddScoped(); services.AddScoped(); + // Register notification service (SignalR real-time notifications) + services.AddScoped(); + // Register background services services.AddHostedService(); diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Services/McpNotificationService.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Services/McpNotificationService.cs new file mode 100644 index 0000000..0e9527c --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Services/McpNotificationService.cs @@ -0,0 +1,135 @@ +using ColaFlow.API.Hubs; +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.Services; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Infrastructure.Services; + +/// +/// Implementation of IMcpNotificationService using SignalR +/// +public class McpNotificationService : IMcpNotificationService +{ + private readonly IHubContext _hubContext; + private readonly ILogger _logger; + + public McpNotificationService( + IHubContext hubContext, + ILogger logger) + { + _hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task NotifyPendingChangeCreatedAsync( + PendingChangeCreatedNotification notification, + CancellationToken cancellationToken = default) + { + var groupName = GetPendingChangeGroupName(notification.PendingChangeId); + var tenantGroupName = GetTenantGroupName(notification.TenantId); + + _logger.LogInformation( + "Sending PendingChangeCreated notification - PendingChangeId={PendingChangeId}, EntityType={EntityType}, Operation={Operation}", + notification.PendingChangeId, notification.EntityType, notification.Operation); + + // Send to both: specific pending change subscribers AND all tenant members + await _hubContext.Clients + .Groups(groupName, tenantGroupName) + .SendAsync("PendingChangeCreated", notification, cancellationToken); + + _logger.LogDebug( + "PendingChangeCreated notification sent - Groups=[{GroupName}, {TenantGroupName}]", + groupName, tenantGroupName); + } + + public async Task NotifyPendingChangeApprovedAsync( + PendingChangeApprovedNotification notification, + CancellationToken cancellationToken = default) + { + var groupName = GetPendingChangeGroupName(notification.PendingChangeId); + var tenantGroupName = GetTenantGroupName(notification.TenantId); + + _logger.LogInformation( + "Sending PendingChangeApproved notification - PendingChangeId={PendingChangeId}, EntityType={EntityType}, ApprovedBy={ApprovedBy}", + notification.PendingChangeId, notification.EntityType, notification.ApprovedBy); + + await _hubContext.Clients + .Groups(groupName, tenantGroupName) + .SendAsync("PendingChangeApproved", notification, cancellationToken); + + _logger.LogDebug( + "PendingChangeApproved notification sent - Groups=[{GroupName}, {TenantGroupName}]", + groupName, tenantGroupName); + } + + public async Task NotifyPendingChangeRejectedAsync( + PendingChangeRejectedNotification notification, + CancellationToken cancellationToken = default) + { + var groupName = GetPendingChangeGroupName(notification.PendingChangeId); + var tenantGroupName = GetTenantGroupName(notification.TenantId); + + _logger.LogInformation( + "Sending PendingChangeRejected notification - PendingChangeId={PendingChangeId}, Reason={Reason}, RejectedBy={RejectedBy}", + notification.PendingChangeId, notification.Reason, notification.RejectedBy); + + await _hubContext.Clients + .Groups(groupName, tenantGroupName) + .SendAsync("PendingChangeRejected", notification, cancellationToken); + + _logger.LogDebug( + "PendingChangeRejected notification sent - Groups=[{GroupName}, {TenantGroupName}]", + groupName, tenantGroupName); + } + + public async Task NotifyPendingChangeAppliedAsync( + PendingChangeAppliedNotification notification, + CancellationToken cancellationToken = default) + { + var groupName = GetPendingChangeGroupName(notification.PendingChangeId); + var tenantGroupName = GetTenantGroupName(notification.TenantId); + + _logger.LogInformation( + "Sending PendingChangeApplied notification - PendingChangeId={PendingChangeId}, Result={Result}", + notification.PendingChangeId, notification.Result); + + await _hubContext.Clients + .Groups(groupName, tenantGroupName) + .SendAsync("PendingChangeApplied", notification, cancellationToken); + + _logger.LogDebug( + "PendingChangeApplied notification sent - Groups=[{GroupName}, {TenantGroupName}]", + groupName, tenantGroupName); + } + + public async Task NotifyPendingChangeExpiredAsync( + PendingChangeExpiredNotification notification, + CancellationToken cancellationToken = default) + { + var groupName = GetPendingChangeGroupName(notification.PendingChangeId); + var tenantGroupName = GetTenantGroupName(notification.TenantId); + + _logger.LogInformation( + "Sending PendingChangeExpired notification - PendingChangeId={PendingChangeId}, ExpiredAt={ExpiredAt}", + notification.PendingChangeId, notification.ExpiredAt); + + await _hubContext.Clients + .Groups(groupName, tenantGroupName) + .SendAsync("PendingChangeExpired", notification, cancellationToken); + + _logger.LogDebug( + "PendingChangeExpired notification sent - Groups=[{GroupName}, {TenantGroupName}]", + groupName, tenantGroupName); + } + + private static string GetPendingChangeGroupName(Guid pendingChangeId) + { + return $"pending-change-{pendingChangeId}"; + } + + private static string GetTenantGroupName(Guid tenantId) + { + return $"tenant-{tenantId}"; + } +} diff --git a/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/EventHandlers/PendingChangeApprovedNotificationHandlerTests.cs b/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/EventHandlers/PendingChangeApprovedNotificationHandlerTests.cs new file mode 100644 index 0000000..92f4ee8 --- /dev/null +++ b/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/EventHandlers/PendingChangeApprovedNotificationHandlerTests.cs @@ -0,0 +1,103 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.EventHandlers; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Events; +using ColaFlow.Modules.Mcp.Domain.ValueObjects; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace ColaFlow.Modules.Mcp.Tests.EventHandlers; + +public class PendingChangeApprovedNotificationHandlerTests +{ + private readonly Mock _mockNotificationService; + private readonly Mock> _mockLogger; + private readonly PendingChangeApprovedNotificationHandler _handler; + + public PendingChangeApprovedNotificationHandlerTests() + { + _mockNotificationService = new Mock(); + _mockLogger = new Mock>(); + + _handler = new PendingChangeApprovedNotificationHandler( + _mockNotificationService.Object, + _mockLogger.Object); + } + + [Fact] + public async Task Handle_SendsNotification_WithCorrectData() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var approvedBy = Guid.NewGuid(); + var entityId = Guid.NewGuid(); + + var diff = new DiffPreview( + "CREATE", + "Epic", + entityId, + null, + null, + "{\"name\":\"Test Epic\"}", + new List()); + + var domainEvent = new PendingChangeApprovedEvent( + pendingChangeId, + "create_epic", + diff, + approvedBy, + tenantId); + + // Act + await _handler.Handle(domainEvent, CancellationToken.None); + + // Assert + _mockNotificationService.Verify( + s => s.NotifyPendingChangeApprovedAsync( + It.Is(n => + n.PendingChangeId == pendingChangeId && + n.ToolName == "create_epic" && + n.EntityType == "Epic" && + n.Operation == "CREATE" && + n.EntityId == entityId && + n.ApprovedBy == approvedBy && + n.TenantId == tenantId), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task Handle_DoesNotThrow_WhenNotificationServiceFails() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var approvedBy = Guid.NewGuid(); + + var diff = new DiffPreview( + "CREATE", + "Epic", + null, + null, + null, + "{\"name\":\"Test Epic\"}", + new List()); + + var domainEvent = new PendingChangeApprovedEvent( + pendingChangeId, + "create_epic", + diff, + approvedBy, + tenantId); + + _mockNotificationService.Setup(s => s.NotifyPendingChangeApprovedAsync( + It.IsAny(), + It.IsAny())) + .ThrowsAsync(new Exception("SignalR connection failed")); + + // Act & Assert - Should not throw + await _handler.Handle(domainEvent, CancellationToken.None); + } +} diff --git a/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/EventHandlers/PendingChangeCreatedNotificationHandlerTests.cs b/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/EventHandlers/PendingChangeCreatedNotificationHandlerTests.cs new file mode 100644 index 0000000..aa75e91 --- /dev/null +++ b/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/EventHandlers/PendingChangeCreatedNotificationHandlerTests.cs @@ -0,0 +1,153 @@ +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Application.EventHandlers; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Entities; +using ColaFlow.Modules.Mcp.Domain.Events; +using ColaFlow.Modules.Mcp.Domain.Repositories; +using ColaFlow.Modules.Mcp.Domain.ValueObjects; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace ColaFlow.Modules.Mcp.Tests.EventHandlers; + +public class PendingChangeCreatedNotificationHandlerTests +{ + private readonly Mock _mockNotificationService; + private readonly Mock _mockRepository; + private readonly Mock> _mockLogger; + private readonly PendingChangeCreatedNotificationHandler _handler; + + public PendingChangeCreatedNotificationHandlerTests() + { + _mockNotificationService = new Mock(); + _mockRepository = new Mock(); + _mockLogger = new Mock>(); + + _handler = new PendingChangeCreatedNotificationHandler( + _mockNotificationService.Object, + _mockRepository.Object, + _mockLogger.Object); + } + + [Fact] + public async Task Handle_SendsNotification_WhenPendingChangeExists() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var apiKeyId = Guid.NewGuid(); + + var domainEvent = new PendingChangeCreatedEvent( + pendingChangeId, + "create_epic", + "Epic", + "CREATE", + tenantId); + + var diff = new DiffPreview( + "CREATE", + "Epic", + null, + null, + null, + "{\"name\":\"Test Epic\"}", + new List()); + + var pendingChange = PendingChange.Create( + "create_epic", + diff, + tenantId, + apiKeyId, + 12); + + _mockRepository.Setup(r => r.GetByIdAsync(pendingChangeId, It.IsAny())) + .ReturnsAsync(pendingChange); + + // Act + await _handler.Handle(domainEvent, CancellationToken.None); + + // Assert + _mockNotificationService.Verify( + s => s.NotifyPendingChangeCreatedAsync( + It.Is(n => + n.PendingChangeId == pendingChangeId && + n.ToolName == "create_epic" && + n.EntityType == "Epic" && + n.Operation == "CREATE" && + n.TenantId == tenantId), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task Handle_DoesNotSendNotification_WhenPendingChangeNotFound() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + + var domainEvent = new PendingChangeCreatedEvent( + pendingChangeId, + "create_epic", + "Epic", + "CREATE", + tenantId); + + _mockRepository.Setup(r => r.GetByIdAsync(pendingChangeId, It.IsAny())) + .ReturnsAsync((PendingChange?)null); + + // Act + await _handler.Handle(domainEvent, CancellationToken.None); + + // Assert + _mockNotificationService.Verify( + s => s.NotifyPendingChangeCreatedAsync( + It.IsAny(), + It.IsAny()), + Times.Never); + } + + [Fact] + public async Task Handle_DoesNotThrow_WhenNotificationServiceFails() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var apiKeyId = Guid.NewGuid(); + + var domainEvent = new PendingChangeCreatedEvent( + pendingChangeId, + "create_epic", + "Epic", + "CREATE", + tenantId); + + var diff = new DiffPreview( + "CREATE", + "Epic", + null, + null, + null, + "{\"name\":\"Test Epic\"}", + new List()); + + var pendingChange = PendingChange.Create( + "create_epic", + diff, + tenantId, + apiKeyId, + 12); + + _mockRepository.Setup(r => r.GetByIdAsync(pendingChangeId, It.IsAny())) + .ReturnsAsync(pendingChange); + + _mockNotificationService.Setup(s => s.NotifyPendingChangeCreatedAsync( + It.IsAny(), + It.IsAny())) + .ThrowsAsync(new Exception("SignalR connection failed")); + + // Act & Assert - Should not throw + await _handler.Handle(domainEvent, CancellationToken.None); + } +} diff --git a/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/Services/McpNotificationServiceTests.cs b/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/Services/McpNotificationServiceTests.cs new file mode 100644 index 0000000..5473bb7 --- /dev/null +++ b/colaflow-api/tests/Modules/Mcp/ColaFlow.Modules.Mcp.Tests/Services/McpNotificationServiceTests.cs @@ -0,0 +1,233 @@ +using ColaFlow.API.Hubs; +using ColaFlow.Modules.Mcp.Application.DTOs.Notifications; +using ColaFlow.Modules.Mcp.Infrastructure.Services; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace ColaFlow.Modules.Mcp.Tests.Services; + +public class McpNotificationServiceTests +{ + private readonly Mock> _mockHubContext; + private readonly Mock> _mockLogger; + private readonly Mock _mockClients; + private readonly Mock _mockClientProxy; + private readonly McpNotificationService _service; + + public McpNotificationServiceTests() + { + _mockHubContext = new Mock>(); + _mockLogger = new Mock>(); + _mockClients = new Mock(); + _mockClientProxy = new Mock(); + + _mockHubContext.Setup(h => h.Clients).Returns(_mockClients.Object); + + _service = new McpNotificationService(_mockHubContext.Object, _mockLogger.Object); + } + + [Fact] + public async Task NotifyPendingChangeCreatedAsync_SendsNotificationToCorrectGroups() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var notification = new PendingChangeCreatedNotification + { + NotificationType = "PendingChangeCreated", + PendingChangeId = pendingChangeId, + ToolName = "create_epic", + EntityType = "Epic", + Operation = "CREATE", + Summary = "Create Epic: Test Epic", + TenantId = tenantId + }; + + _mockClients.Setup(c => c.Groups(It.IsAny(), It.IsAny())) + .Returns(_mockClientProxy.Object); + + // Act + await _service.NotifyPendingChangeCreatedAsync(notification); + + // Assert + _mockClients.Verify( + c => c.Groups($"pending-change-{pendingChangeId}", $"tenant-{tenantId}"), + Times.Once); + + _mockClientProxy.Verify( + p => p.SendCoreAsync( + "PendingChangeCreated", + It.Is(args => args.Length == 1 && args[0] == notification), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task NotifyPendingChangeApprovedAsync_SendsNotificationWithCorrectData() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var approvedBy = Guid.NewGuid(); + var entityId = Guid.NewGuid(); + + var notification = new PendingChangeApprovedNotification + { + NotificationType = "PendingChangeApproved", + PendingChangeId = pendingChangeId, + ToolName = "create_epic", + EntityType = "Epic", + Operation = "CREATE", + EntityId = entityId, + ApprovedBy = approvedBy, + ExecutionResult = "Epic created: Test Epic", + TenantId = tenantId + }; + + _mockClients.Setup(c => c.Groups(It.IsAny(), It.IsAny())) + .Returns(_mockClientProxy.Object); + + // Act + await _service.NotifyPendingChangeApprovedAsync(notification); + + // Assert + _mockClientProxy.Verify( + p => p.SendCoreAsync( + "PendingChangeApproved", + It.Is(args => + args.Length == 1 && + ((PendingChangeApprovedNotification)args[0]).ApprovedBy == approvedBy && + ((PendingChangeApprovedNotification)args[0]).EntityId == entityId), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task NotifyPendingChangeRejectedAsync_SendsNotificationWithReason() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var rejectedBy = Guid.NewGuid(); + var reason = "Epic name is too vague"; + + var notification = new PendingChangeRejectedNotification + { + NotificationType = "PendingChangeRejected", + PendingChangeId = pendingChangeId, + ToolName = "create_epic", + Reason = reason, + RejectedBy = rejectedBy, + TenantId = tenantId + }; + + _mockClients.Setup(c => c.Groups(It.IsAny(), It.IsAny())) + .Returns(_mockClientProxy.Object); + + // Act + await _service.NotifyPendingChangeRejectedAsync(notification); + + // Assert + _mockClientProxy.Verify( + p => p.SendCoreAsync( + "PendingChangeRejected", + It.Is(args => + args.Length == 1 && + ((PendingChangeRejectedNotification)args[0]).Reason == reason && + ((PendingChangeRejectedNotification)args[0]).RejectedBy == rejectedBy), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task NotifyPendingChangeAppliedAsync_SendsNotificationWithResult() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + var result = "Epic created: abc123 - Test Epic"; + + var notification = new PendingChangeAppliedNotification + { + NotificationType = "PendingChangeApplied", + PendingChangeId = pendingChangeId, + ToolName = "create_epic", + Result = result, + AppliedAt = DateTime.UtcNow, + TenantId = tenantId + }; + + _mockClients.Setup(c => c.Groups(It.IsAny(), It.IsAny())) + .Returns(_mockClientProxy.Object); + + // Act + await _service.NotifyPendingChangeAppliedAsync(notification); + + // Assert + _mockClientProxy.Verify( + p => p.SendCoreAsync( + "PendingChangeApplied", + It.Is(args => + args.Length == 1 && + ((PendingChangeAppliedNotification)args[0]).Result == result), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task NotifyPendingChangeExpiredAsync_SendsNotificationToCorrectGroups() + { + // Arrange + var pendingChangeId = Guid.NewGuid(); + var tenantId = Guid.NewGuid(); + + var notification = new PendingChangeExpiredNotification + { + NotificationType = "PendingChangeExpired", + PendingChangeId = pendingChangeId, + ToolName = "create_epic", + ExpiredAt = DateTime.UtcNow, + TenantId = tenantId + }; + + _mockClients.Setup(c => c.Groups(It.IsAny(), It.IsAny())) + .Returns(_mockClientProxy.Object); + + // Act + await _service.NotifyPendingChangeExpiredAsync(notification); + + // Assert + _mockClients.Verify( + c => c.Groups($"pending-change-{pendingChangeId}", $"tenant-{tenantId}"), + Times.Once); + + _mockClientProxy.Verify( + p => p.SendCoreAsync( + "PendingChangeExpired", + It.Is(args => args.Length == 1), + It.IsAny()), + Times.Once); + } + + [Fact] + public void Constructor_ThrowsArgumentNullException_WhenHubContextIsNull() + { + // Act & Assert + var exception = Assert.Throws(() => + new McpNotificationService(null!, _mockLogger.Object)); + + Assert.Equal("hubContext", exception.ParamName); + } + + [Fact] + public void Constructor_ThrowsArgumentNullException_WhenLoggerIsNull() + { + // Act & Assert + var exception = Assert.Throws(() => + new McpNotificationService(_mockHubContext.Object, null!)); + + Assert.Equal("logger", exception.ParamName); + } +} diff --git a/docs/integration/signalr-mcp-client-guide.md b/docs/integration/signalr-mcp-client-guide.md new file mode 100644 index 0000000..10401ea --- /dev/null +++ b/docs/integration/signalr-mcp-client-guide.md @@ -0,0 +1,475 @@ +# SignalR MCP Notifications Client Integration Guide + +## Overview + +This guide explains how to integrate with ColaFlow's MCP SignalR Hub to receive real-time notifications about PendingChange status updates. + +## Hub Endpoint + +``` +wss://your-colaflow-instance.com/hubs/mcp-notifications +``` + +## Authentication + +The hub requires JWT authentication. Pass the JWT token in the query string: + +``` +wss://your-colaflow-instance.com/hubs/mcp-notifications?access_token=YOUR_JWT_TOKEN +``` + +## Notification Types + +The hub sends the following notification types: + +1. **PendingChangeCreated** - A new PendingChange was created +2. **PendingChangeApproved** - A PendingChange was approved +3. **PendingChangeRejected** - A PendingChange was rejected +4. **PendingChangeApplied** - A PendingChange was successfully applied +5. **PendingChangeExpired** - A PendingChange expired (timeout) + +## Client Implementation Examples + +### JavaScript/TypeScript (SignalR Client) + +```typescript +import * as signalR from "@microsoft/signalr"; + +// Create connection +const connection = new signalR.HubConnectionBuilder() + .withUrl("https://your-colaflow-instance.com/hubs/mcp-notifications", { + accessTokenFactory: () => localStorage.getItem("jwt_token") || "", + }) + .withAutomaticReconnect() // Auto-reconnect on disconnect + .configureLogging(signalR.LogLevel.Information) + .build(); + +// Subscribe to PendingChangeCreated +connection.on("PendingChangeCreated", (notification) => { + console.log("New PendingChange created:", notification); + console.log("Summary:", notification.summary); + console.log("Entity Type:", notification.entityType); + console.log("Operation:", notification.operation); +}); + +// Subscribe to PendingChangeApproved +connection.on("PendingChangeApproved", (notification) => { + console.log("PendingChange approved:", notification); + console.log("Approved by:", notification.approvedBy); + console.log("Entity ID:", notification.entityId); +}); + +// Subscribe to PendingChangeRejected +connection.on("PendingChangeRejected", (notification) => { + console.log("PendingChange rejected:", notification); + console.log("Reason:", notification.reason); + console.log("Rejected by:", notification.rejectedBy); +}); + +// Subscribe to PendingChangeApplied +connection.on("PendingChangeApplied", (notification) => { + console.log("PendingChange applied:", notification); + console.log("Result:", notification.result); +}); + +// Subscribe to PendingChangeExpired +connection.on("PendingChangeExpired", (notification) => { + console.log("PendingChange expired:", notification); +}); + +// Start connection +async function start() { + try { + await connection.start(); + console.log("SignalR Connected"); + } catch (err) { + console.error("SignalR Connection Error:", err); + setTimeout(start, 5000); // Retry after 5 seconds + } +} + +// Handle disconnection +connection.onclose(async () => { + console.log("SignalR Disconnected"); + await start(); // Reconnect +}); + +// Start the connection +start(); +``` + +### React Hook + +```typescript +import { useEffect, useState } from "react"; +import * as signalR from "@microsoft/signalr"; + +export function useMcpNotifications(jwtToken: string) { + const [connection, setConnection] = useState(null); + const [isConnected, setIsConnected] = useState(false); + + useEffect(() => { + const newConnection = new signalR.HubConnectionBuilder() + .withUrl("https://your-colaflow-instance.com/hubs/mcp-notifications", { + accessTokenFactory: () => jwtToken, + }) + .withAutomaticReconnect() + .build(); + + setConnection(newConnection); + + return () => { + newConnection.stop(); + }; + }, [jwtToken]); + + useEffect(() => { + if (connection) { + connection + .start() + .then(() => { + console.log("SignalR Connected"); + setIsConnected(true); + }) + .catch((err) => { + console.error("SignalR Connection Error:", err); + setIsConnected(false); + }); + + connection.onclose(() => { + console.log("SignalR Disconnected"); + setIsConnected(false); + }); + } + }, [connection]); + + return { connection, isConnected }; +} + +// Usage in a component +function MyComponent() { + const jwtToken = localStorage.getItem("jwt_token") || ""; + const { connection, isConnected } = useMcpNotifications(jwtToken); + + useEffect(() => { + if (!connection) return; + + // Subscribe to notifications + connection.on("PendingChangeCreated", (notification) => { + console.log("New PendingChange:", notification); + // Update UI, show toast, etc. + }); + + connection.on("PendingChangeApproved", (notification) => { + console.log("PendingChange approved:", notification); + // Update UI, show success message, etc. + }); + + connection.on("PendingChangeRejected", (notification) => { + console.log("PendingChange rejected:", notification); + // Update UI, show error message, etc. + }); + + // Cleanup + return () => { + connection.off("PendingChangeCreated"); + connection.off("PendingChangeApproved"); + connection.off("PendingChangeRejected"); + }; + }, [connection]); + + return ( +
+

Connection Status: {isConnected ? "Connected" : "Disconnected"}

+
+ ); +} +``` + +### Python (AI Agent) + +```python +import asyncio +import json +from signalrcore.hub_connection_builder import HubConnectionBuilder + +class McpNotificationClient: + def __init__(self, hub_url: str, jwt_token: str): + self.hub_url = hub_url + self.jwt_token = jwt_token + self.connection = None + + def on_pending_change_created(self, notification): + print(f"PendingChange created: {notification}") + print(f"Summary: {notification['summary']}") + + def on_pending_change_approved(self, notification): + print(f"PendingChange approved: {notification}") + print(f"Entity ID: {notification['entityId']}") + # Continue AI workflow... + + def on_pending_change_rejected(self, notification): + print(f"PendingChange rejected: {notification}") + print(f"Reason: {notification['reason']}") + # Adjust AI behavior... + + def on_pending_change_applied(self, notification): + print(f"PendingChange applied: {notification}") + print(f"Result: {notification['result']}") + + def on_pending_change_expired(self, notification): + print(f"PendingChange expired: {notification}") + + async def start(self): + self.connection = HubConnectionBuilder()\ + .with_url(f"{self.hub_url}?access_token={self.jwt_token}")\ + .with_automatic_reconnect({ + "type": "interval", + "intervals": [0, 2, 10, 30] + })\ + .build() + + # Register event handlers + self.connection.on("PendingChangeCreated", self.on_pending_change_created) + self.connection.on("PendingChangeApproved", self.on_pending_change_approved) + self.connection.on("PendingChangeRejected", self.on_pending_change_rejected) + self.connection.on("PendingChangeApplied", self.on_pending_change_applied) + self.connection.on("PendingChangeExpired", self.on_pending_change_expired) + + # Start connection + self.connection.start() + print("SignalR Connected") + + async def subscribe_to_pending_change(self, pending_change_id: str): + """Subscribe to notifications for a specific pending change""" + await self.connection.invoke("SubscribeToPendingChange", pending_change_id) + print(f"Subscribed to PendingChange {pending_change_id}") + + async def unsubscribe_from_pending_change(self, pending_change_id: str): + """Unsubscribe from notifications for a specific pending change""" + await self.connection.invoke("UnsubscribeFromPendingChange", pending_change_id) + print(f"Unsubscribed from PendingChange {pending_change_id}") + + def stop(self): + if self.connection: + self.connection.stop() + +# Usage +async def main(): + client = McpNotificationClient( + hub_url="https://your-colaflow-instance.com/hubs/mcp-notifications", + jwt_token="YOUR_JWT_TOKEN" + ) + + await client.start() + + # Subscribe to a specific pending change + await client.subscribe_to_pending_change("123e4567-e89b-12d3-a456-426614174000") + + # Keep running + await asyncio.sleep(3600) # Run for 1 hour + + client.stop() + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Hub Methods + +### SubscribeToPendingChange + +Subscribe to receive notifications for a specific pending change. + +```typescript +await connection.invoke("SubscribeToPendingChange", pendingChangeId); +``` + +**Parameters:** +- `pendingChangeId` (Guid/string): The ID of the pending change to subscribe to + +### UnsubscribeFromPendingChange + +Unsubscribe from receiving notifications for a specific pending change. + +```typescript +await connection.invoke("UnsubscribeFromPendingChange", pendingChangeId); +``` + +**Parameters:** +- `pendingChangeId` (Guid/string): The ID of the pending change to unsubscribe from + +## Notification Payloads + +### PendingChangeCreatedNotification + +```json +{ + "notificationType": "PendingChangeCreated", + "pendingChangeId": "123e4567-e89b-12d3-a456-426614174000", + "toolName": "create_epic", + "entityType": "Epic", + "operation": "CREATE", + "summary": "Create Epic: Implement User Authentication", + "tenantId": "456e7890-e89b-12d3-a456-426614174000", + "timestamp": "2025-01-15T10:30:00Z" +} +``` + +### PendingChangeApprovedNotification + +```json +{ + "notificationType": "PendingChangeApproved", + "pendingChangeId": "123e4567-e89b-12d3-a456-426614174000", + "toolName": "create_epic", + "entityType": "Epic", + "operation": "CREATE", + "entityId": "789e0123-e89b-12d3-a456-426614174000", + "approvedBy": "234e5678-e89b-12d3-a456-426614174000", + "executionResult": "Epic created: 789e0123-e89b-12d3-a456-426614174000 - Implement User Authentication", + "tenantId": "456e7890-e89b-12d3-a456-426614174000", + "timestamp": "2025-01-15T10:31:00Z" +} +``` + +### PendingChangeRejectedNotification + +```json +{ + "notificationType": "PendingChangeRejected", + "pendingChangeId": "123e4567-e89b-12d3-a456-426614174000", + "toolName": "create_epic", + "reason": "Epic name is too vague, please provide more details", + "rejectedBy": "234e5678-e89b-12d3-a456-426614174000", + "tenantId": "456e7890-e89b-12d3-a456-426614174000", + "timestamp": "2025-01-15T10:31:00Z" +} +``` + +### PendingChangeAppliedNotification + +```json +{ + "notificationType": "PendingChangeApplied", + "pendingChangeId": "123e4567-e89b-12d3-a456-426614174000", + "toolName": "create_epic", + "result": "Epic created: 789e0123-e89b-12d3-a456-426614174000 - Implement User Authentication", + "appliedAt": "2025-01-15T10:31:05Z", + "tenantId": "456e7890-e89b-12d3-a456-426614174000", + "timestamp": "2025-01-15T10:31:05Z" +} +``` + +### PendingChangeExpiredNotification + +```json +{ + "notificationType": "PendingChangeExpired", + "pendingChangeId": "123e4567-e89b-12d3-a456-426614174000", + "toolName": "create_epic", + "expiredAt": "2025-01-15T22:30:00Z", + "tenantId": "456e7890-e89b-12d3-a456-426614174000", + "timestamp": "2025-01-15T22:30:00Z" +} +``` + +## Tenant Isolation + +All notifications are automatically isolated by tenant. When you connect, you are automatically added to your tenant's group, and you will only receive notifications for PendingChanges belonging to your tenant. + +## Connection Lifecycle + +1. **Connect**: Establish WebSocket connection with JWT token +2. **Automatic Tenant Group Join**: Server automatically adds you to your tenant's group +3. **Subscribe**: Optionally subscribe to specific pending changes +4. **Receive Notifications**: Get real-time updates +5. **Unsubscribe**: Optionally unsubscribe from specific pending changes +6. **Disconnect**: Close connection when done + +## Error Handling + +```typescript +connection.onreconnecting((error) => { + console.warn("SignalR Reconnecting:", error); +}); + +connection.onreconnected((connectionId) => { + console.log("SignalR Reconnected:", connectionId); +}); + +connection.onclose((error) => { + console.error("SignalR Connection Closed:", error); + // Implement custom reconnection logic if needed +}); +``` + +## Best Practices + +1. **Use Automatic Reconnect**: Always enable `.withAutomaticReconnect()` to handle temporary disconnections +2. **Subscribe Selectively**: Only subscribe to specific pending changes you care about +3. **Clean Up**: Always unsubscribe and close connections when done +4. **Error Handling**: Implement proper error handling for network failures +5. **Token Refresh**: Refresh JWT tokens before they expire to maintain connection +6. **Logging**: Enable appropriate logging level for debugging + +## Fallback Strategy + +If SignalR/WebSocket is not available or fails, you can fall back to polling: + +```typescript +// Fallback to polling if SignalR fails +async function pollPendingChange(pendingChangeId: string) { + const interval = setInterval(async () => { + try { + const response = await fetch( + `https://api.colaflow.com/api/mcp/pending-changes/${pendingChangeId}`, + { + headers: { + Authorization: `Bearer ${jwtToken}`, + }, + } + ); + const pendingChange = await response.json(); + + if (pendingChange.status === "Approved" || pendingChange.status === "Rejected") { + clearInterval(interval); + console.log("PendingChange status updated:", pendingChange.status); + } + } catch (error) { + console.error("Polling error:", error); + } + }, 2000); // Poll every 2 seconds +} +``` + +## Performance Considerations + +- **Connection Reuse**: Reuse a single SignalR connection for multiple subscriptions +- **Selective Subscriptions**: Only subscribe to pending changes you need to track +- **Bandwidth**: SignalR uses WebSocket, which is much more efficient than polling +- **Scalability**: The server uses SignalR Groups for efficient message routing + +## Troubleshooting + +### Connection Fails + +- Verify JWT token is valid and not expired +- Check CORS settings if connecting from a browser +- Ensure WebSocket is not blocked by firewall/proxy + +### Not Receiving Notifications + +- Verify you're connected (check `connection.state === HubConnectionState.Connected`) +- Check tenant ID matches the PendingChange's tenant +- Verify you've subscribed to the pending change (if using selective subscriptions) + +### Notifications Delayed + +- Check network latency +- Verify server is not under heavy load +- Check if automatic reconnect is triggering frequently (network instability) + +## Support + +For issues or questions, please contact the ColaFlow development team or open an issue in the repository. diff --git a/docs/stories/sprint_5/story_5_12.md b/docs/stories/sprint_5/story_5_12.md index b865798..2515316 100644 --- a/docs/stories/sprint_5/story_5_12.md +++ b/docs/stories/sprint_5/story_5_12.md @@ -2,12 +2,13 @@ story_id: story_5_12 sprint_id: sprint_5 phase: Phase 3 - Tools & Diff Preview -status: not_started +status: completed priority: P0 story_points: 3 assignee: backend estimated_days: 1 created_date: 2025-11-06 +completion_date: 2025-11-09 dependencies: [story_5_10] ---