From 2fec2df004c2ce23e56c6600cb2c0d3d4f755ec5 Mon Sep 17 00:00:00 2001 From: Yaojia Wang Date: Sun, 9 Nov 2025 17:58:12 +0100 Subject: [PATCH] feat(backend): Implement PendingChange Management (Story 5.10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented complete Human-in-the-Loop approval workflow for AI-proposed changes: Changes: - Created PendingChange DTOs (PendingChangeDto, CreatePendingChangeRequest, ApproveChangeRequest, RejectChangeRequest, PendingChangeFilterDto) - Implemented IPendingChangeService interface with CRUD, approval/rejection, expiration, and deletion operations - Implemented PendingChangeService with full workflow support and tenant isolation - Created McpPendingChangesController REST API with endpoints for listing, approving, rejecting, and deleting pending changes - Implemented PendingChangeApprovedEventHandler to execute approved changes via MediatR commands (Project, Epic, Story, Task CRUD operations) - Created PendingChangeExpirationBackgroundService for auto-expiration of changes after 24 hours - Registered all services and background service in DI container Technical Details: - Status flow: PendingApproval → Approved → Applied (or Rejected/Expired) - Tenant isolation enforced in all operations - Domain events published for audit trail - Event-driven execution using MediatR - Background service runs every 5 minutes to expire old changes - JWT authentication required for all endpoints 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../McpPendingChangesController.cs | 229 ++++++++++ .../DTOs/ApproveChangeRequest.cs | 12 + .../DTOs/CreatePendingChangeRequest.cs | 13 + .../DTOs/PendingChangeDto.cs | 29 ++ .../DTOs/PendingChangeFilterDto.cs | 18 + .../DTOs/RejectChangeRequest.cs | 9 + .../PendingChangeApprovedEventHandler.cs | 318 ++++++++++++++ .../Services/IPendingChangeService.cs | 68 +++ .../Services/PendingChangeService.cs | 410 ++++++++++++++++++ ...endingChangeExpirationBackgroundService.cs | 71 +++ .../Extensions/McpServiceExtensions.cs | 5 + 11 files changed, 1182 insertions(+) create mode 100644 colaflow-api/src/ColaFlow.API/Controllers/McpPendingChangesController.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/ApproveChangeRequest.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/CreatePendingChangeRequest.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeDto.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeFilterDto.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/RejectChangeRequest.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedEventHandler.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IPendingChangeService.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/PendingChangeService.cs create mode 100644 colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/BackgroundServices/PendingChangeExpirationBackgroundService.cs diff --git a/colaflow-api/src/ColaFlow.API/Controllers/McpPendingChangesController.cs b/colaflow-api/src/ColaFlow.API/Controllers/McpPendingChangesController.cs new file mode 100644 index 0000000..7d1e93d --- /dev/null +++ b/colaflow-api/src/ColaFlow.API/Controllers/McpPendingChangesController.cs @@ -0,0 +1,229 @@ +using ColaFlow.Modules.Mcp.Application.DTOs; +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.ValueObjects; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.Security.Claims; + +namespace ColaFlow.API.Controllers; + +/// +/// Controller for managing PendingChanges (AI-proposed changes awaiting approval) +/// Requires JWT authentication +/// +[ApiController] +[Route("api/mcp/pending-changes")] +[Authorize] // Requires JWT authentication +public class McpPendingChangesController : ControllerBase +{ + private readonly IPendingChangeService _pendingChangeService; + private readonly ILogger _logger; + + public McpPendingChangesController( + IPendingChangeService pendingChangeService, + ILogger logger) + { + _pendingChangeService = pendingChangeService ?? throw new ArgumentNullException(nameof(pendingChangeService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Get list of pending changes with filtering and pagination + /// + [HttpGet] + [ProducesResponseType(typeof(PendingChangeListResponse), 200)] + [ProducesResponseType(401)] + public async Task GetPendingChanges( + [FromQuery] string? status = null, + [FromQuery] string? entityType = null, + [FromQuery] Guid? entityId = null, + [FromQuery] Guid? apiKeyId = null, + [FromQuery] string? toolName = null, + [FromQuery] bool? includeExpired = null, + [FromQuery] int page = 1, + [FromQuery] int pageSize = 20) + { + try + { + var filter = new PendingChangeFilterDto + { + Status = string.IsNullOrWhiteSpace(status) ? null : Enum.Parse(status, true), + EntityType = entityType, + EntityId = entityId, + ApiKeyId = apiKeyId, + ToolName = toolName, + IncludeExpired = includeExpired, + Page = page, + PageSize = Math.Min(pageSize, 100) // Max 100 items per page + }; + + var (items, totalCount) = await _pendingChangeService.GetPendingChangesAsync(filter, HttpContext.RequestAborted); + + var response = new PendingChangeListResponse + { + Items = items, + TotalCount = totalCount, + Page = page, + PageSize = pageSize, + TotalPages = (int)Math.Ceiling((double)totalCount / pageSize) + }; + + return Ok(response); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to get pending changes"); + return StatusCode(500, new { message = "Failed to retrieve pending changes" }); + } + } + + /// + /// Get a specific pending change by ID + /// + [HttpGet("{id}")] + [ProducesResponseType(typeof(PendingChangeDto), 200)] + [ProducesResponseType(404)] + [ProducesResponseType(401)] + public async Task GetPendingChange(Guid id) + { + try + { + var pendingChange = await _pendingChangeService.GetByIdAsync(id, HttpContext.RequestAborted); + if (pendingChange == null) + { + return NotFound(new { message = "PendingChange not found" }); + } + + return Ok(pendingChange); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to get pending change {Id}", id); + return StatusCode(500, new { message = "Failed to retrieve pending change" }); + } + } + + /// + /// Approve a pending change (will trigger execution) + /// + [HttpPost("{id}/approve")] + [ProducesResponseType(200)] + [ProducesResponseType(400)] + [ProducesResponseType(404)] + [ProducesResponseType(401)] + public async Task ApprovePendingChange(Guid id) + { + try + { + // Extract user ID from JWT claims + var userId = GetUserIdFromClaims(); + + await _pendingChangeService.ApproveAsync(id, userId, HttpContext.RequestAborted); + + _logger.LogInformation("PendingChange {Id} approved by User {UserId}", id, userId); + + return Ok(new { message = "PendingChange approved successfully. Execution in progress." }); + } + catch (InvalidOperationException ex) + { + _logger.LogWarning(ex, "Cannot approve PendingChange {Id}", id); + return BadRequest(new { message = ex.Message }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to approve PendingChange {Id}", id); + return StatusCode(500, new { message = "Failed to approve pending change" }); + } + } + + /// + /// Reject a pending change + /// + [HttpPost("{id}/reject")] + [ProducesResponseType(200)] + [ProducesResponseType(400)] + [ProducesResponseType(404)] + [ProducesResponseType(401)] + public async Task RejectPendingChange(Guid id, [FromBody] RejectChangeRequest request) + { + try + { + if (string.IsNullOrWhiteSpace(request.Reason)) + { + return BadRequest(new { message = "Rejection reason is required" }); + } + + // Extract user ID from JWT claims + var userId = GetUserIdFromClaims(); + + await _pendingChangeService.RejectAsync(id, userId, request.Reason, HttpContext.RequestAborted); + + _logger.LogInformation("PendingChange {Id} rejected by User {UserId}", id, userId); + + return Ok(new { message = "PendingChange rejected successfully" }); + } + catch (InvalidOperationException ex) + { + _logger.LogWarning(ex, "Cannot reject PendingChange {Id}", id); + return BadRequest(new { message = ex.Message }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to reject PendingChange {Id}", id); + return StatusCode(500, new { message = "Failed to reject pending change" }); + } + } + + /// + /// Delete a pending change (only allowed for Expired or Rejected status) + /// + [HttpDelete("{id}")] + [ProducesResponseType(204)] + [ProducesResponseType(400)] + [ProducesResponseType(404)] + [ProducesResponseType(401)] + public async Task DeletePendingChange(Guid id) + { + try + { + await _pendingChangeService.DeleteAsync(id, HttpContext.RequestAborted); + + _logger.LogInformation("PendingChange {Id} deleted", id); + + return NoContent(); + } + catch (InvalidOperationException ex) + { + _logger.LogWarning(ex, "Cannot delete PendingChange {Id}", id); + return BadRequest(new { message = ex.Message }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to delete PendingChange {Id}", id); + return StatusCode(500, new { message = "Failed to delete pending change" }); + } + } + + // Helper method to extract user ID from claims + private Guid GetUserIdFromClaims() + { + var userIdClaim = User.FindFirstValue("user_id") + ?? User.FindFirstValue(ClaimTypes.NameIdentifier) + ?? User.FindFirstValue("sub") + ?? throw new UnauthorizedAccessException("User ID not found in token"); + + return Guid.Parse(userIdClaim); + } +} + +/// +/// Response for paginated list of pending changes +/// +public class PendingChangeListResponse +{ + public List Items { get; set; } = new(); + public int TotalCount { get; set; } + public int Page { get; set; } + public int PageSize { get; set; } + public int TotalPages { get; set; } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/ApproveChangeRequest.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/ApproveChangeRequest.cs new file mode 100644 index 0000000..07bd248 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/ApproveChangeRequest.cs @@ -0,0 +1,12 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs; + +/// +/// Request to approve a PendingChange +/// +public class ApproveChangeRequest +{ + // Currently empty, but we may add fields later like: + // - ApprovalComments + // - AutoApply flag + // - ScheduledExecutionTime +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/CreatePendingChangeRequest.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/CreatePendingChangeRequest.cs new file mode 100644 index 0000000..7b22ebb --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/CreatePendingChangeRequest.cs @@ -0,0 +1,13 @@ +using ColaFlow.Modules.Mcp.Domain.ValueObjects; + +namespace ColaFlow.Modules.Mcp.Application.DTOs; + +/// +/// Request to create a new PendingChange +/// +public class CreatePendingChangeRequest +{ + public string ToolName { get; set; } = null!; + public DiffPreview Diff { get; set; } = null!; + public int ExpirationHours { get; set; } = 24; +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeDto.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeDto.cs new file mode 100644 index 0000000..f29e509 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeDto.cs @@ -0,0 +1,29 @@ +using ColaFlow.Modules.Mcp.Domain.ValueObjects; + +namespace ColaFlow.Modules.Mcp.Application.DTOs; + +/// +/// DTO for PendingChange response +/// +public class PendingChangeDto +{ + public Guid Id { get; set; } + public Guid TenantId { get; set; } + public Guid ApiKeyId { get; set; } + public string ToolName { get; set; } = null!; + public DiffPreviewDto Diff { get; set; } = null!; + public string Status { get; set; } = null!; + public DateTime CreatedAt { get; set; } + public DateTime ExpiresAt { get; set; } + public Guid? ApprovedBy { get; set; } + public DateTime? ApprovedAt { get; set; } + public Guid? RejectedBy { get; set; } + public DateTime? RejectedAt { get; set; } + public string? RejectionReason { get; set; } + public DateTime? AppliedAt { get; set; } + public string? ApplicationResult { get; set; } + public bool IsExpired { get; set; } + public bool CanBeApproved { get; set; } + public bool CanBeRejected { get; set; } + public string Summary { get; set; } = null!; +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeFilterDto.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeFilterDto.cs new file mode 100644 index 0000000..767e6aa --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/PendingChangeFilterDto.cs @@ -0,0 +1,18 @@ +using ColaFlow.Modules.Mcp.Domain.ValueObjects; + +namespace ColaFlow.Modules.Mcp.Application.DTOs; + +/// +/// Filter options for querying PendingChanges +/// +public class PendingChangeFilterDto +{ + public PendingChangeStatus? Status { get; set; } + public string? EntityType { get; set; } + public Guid? EntityId { get; set; } + public Guid? ApiKeyId { get; set; } + public string? ToolName { get; set; } + public bool? IncludeExpired { get; set; } + public int Page { get; set; } = 1; + public int PageSize { get; set; } = 20; +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/RejectChangeRequest.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/RejectChangeRequest.cs new file mode 100644 index 0000000..52e3c14 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/DTOs/RejectChangeRequest.cs @@ -0,0 +1,9 @@ +namespace ColaFlow.Modules.Mcp.Application.DTOs; + +/// +/// Request to reject a PendingChange +/// +public class RejectChangeRequest +{ + public string Reason { get; set; } = null!; +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedEventHandler.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedEventHandler.cs new file mode 100644 index 0000000..480584f --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/EventHandlers/PendingChangeApprovedEventHandler.cs @@ -0,0 +1,318 @@ +using ColaFlow.Modules.Mcp.Application.Services; +using ColaFlow.Modules.Mcp.Domain.Events; +using ColaFlow.Modules.Mcp.Domain.ValueObjects; +using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateEpic; +using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateProject; +using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateStory; +using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateTask; +using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateEpic; +using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateProject; +using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateStory; +using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateTask; +using MediatR; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +namespace ColaFlow.Modules.Mcp.Application.EventHandlers; + +/// +/// Event handler for PendingChangeApprovedEvent +/// Executes the approved change by dispatching appropriate commands +/// +public class PendingChangeApprovedEventHandler : INotificationHandler +{ + private readonly IMediator _mediator; + private readonly IPendingChangeService _pendingChangeService; + private readonly ILogger _logger; + + public PendingChangeApprovedEventHandler( + IMediator mediator, + IPendingChangeService pendingChangeService, + ILogger logger) + { + _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator)); + _pendingChangeService = pendingChangeService ?? throw new ArgumentNullException(nameof(pendingChangeService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task Handle(PendingChangeApprovedEvent notification, CancellationToken cancellationToken) + { + _logger.LogInformation( + "Handling PendingChangeApprovedEvent - PendingChangeId={PendingChangeId}, EntityType={EntityType}, Operation={Operation}", + notification.PendingChangeId, notification.Diff.EntityType, notification.Diff.Operation); + + try + { + // Execute the change based on entity type and operation + var result = await ExecuteChangeAsync(notification.Diff, cancellationToken); + + // Mark as applied + await _pendingChangeService.MarkAsAppliedAsync( + notification.PendingChangeId, + result, + cancellationToken); + + _logger.LogInformation( + "PendingChange executed successfully - PendingChangeId={PendingChangeId}, Result={Result}", + notification.PendingChangeId, result); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to execute PendingChange - PendingChangeId={PendingChangeId}", + notification.PendingChangeId); + + // Mark as failed (store error in ApplicationResult) + await _pendingChangeService.MarkAsAppliedAsync( + notification.PendingChangeId, + $"Failed: {ex.Message}", + cancellationToken); + } + } + + private async Task ExecuteChangeAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var operation = diff.Operation.ToLowerInvariant(); + var entityType = diff.EntityType.ToLowerInvariant(); + + _logger.LogDebug( + "Executing {Operation} on {EntityType}", + operation, entityType); + + return (operation, entityType) switch + { + ("create", "project") => await ExecuteCreateProjectAsync(diff, cancellationToken), + ("update", "project") => await ExecuteUpdateProjectAsync(diff, cancellationToken), + ("create", "epic") => await ExecuteCreateEpicAsync(diff, cancellationToken), + ("update", "epic") => await ExecuteUpdateEpicAsync(diff, cancellationToken), + ("create", "story") => await ExecuteCreateStoryAsync(diff, cancellationToken), + ("update", "story") => await ExecuteUpdateStoryAsync(diff, cancellationToken), + ("create", "task") => await ExecuteCreateTaskAsync(diff, cancellationToken), + ("update", "task") => await ExecuteUpdateTaskAsync(diff, cancellationToken), + _ => throw new NotSupportedException($"Operation '{operation}' on entity type '{entityType}' is not supported") + }; + } + + #region Project Operations + + private async Task ExecuteCreateProjectAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateProject"); + + var command = new CreateProjectCommand + { + Name = GetStringValue(data, "name", "New Project"), + Description = GetStringValue(data, "description", ""), + Key = GetStringValue(data, "key", "PROJ"), + OwnerId = GetGuidValue(data, "ownerId") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Project created: {result.Id} - {result.Name}"; + } + + private async Task ExecuteUpdateProjectAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateProject"); + + var command = new UpdateProjectCommand + { + ProjectId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"), + Name = GetStringValue(data, "name"), + Description = GetStringValue(data, "description"), + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Project updated: {result.Id} - {result.Name}"; + } + + #endregion + + #region Epic Operations + + private async Task ExecuteCreateEpicAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateEpic"); + + var command = new CreateEpicCommand + { + ProjectId = GetGuidValue(data, "projectId"), + Name = GetStringValue(data, "name", "New Epic"), + Description = GetStringValue(data, "description", ""), + CreatedBy = GetGuidValue(data, "createdBy") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Epic created: {result.Id} - {result.Name}"; + } + + private async Task ExecuteUpdateEpicAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateEpic"); + + var command = new UpdateEpicCommand + { + EpicId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"), + Name = GetStringValue(data, "name"), + Description = GetStringValue(data, "description") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Epic updated: {result.Id} - {result.Name}"; + } + + #endregion + + #region Story Operations + + private async Task ExecuteCreateStoryAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateStory"); + + var command = new CreateStoryCommand + { + EpicId = GetGuidValue(data, "epicId"), + Title = GetStringValue(data, "title", "New Story"), + Description = GetStringValue(data, "description", ""), + Priority = GetStringValue(data, "priority", "Medium"), + AssigneeId = GetNullableGuidValue(data, "assigneeId"), + EstimatedHours = GetNullableDecimalValue(data, "estimatedHours"), + CreatedBy = GetGuidValue(data, "createdBy") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Story created: {result.Id} - {result.Title}"; + } + + private async Task ExecuteUpdateStoryAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateStory"); + + var command = new UpdateStoryCommand + { + StoryId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"), + Title = GetStringValue(data, "title"), + Description = GetStringValue(data, "description"), + Status = GetStringValue(data, "status"), + Priority = GetStringValue(data, "priority") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Story updated: {result.Id} - {result.Title}"; + } + + #endregion + + #region Task Operations + + private async Task ExecuteCreateTaskAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateTask"); + + var command = new CreateTaskCommand + { + StoryId = GetGuidValue(data, "storyId"), + Title = GetStringValue(data, "title", "New Task"), + Description = GetStringValue(data, "description", ""), + Priority = GetStringValue(data, "priority", "Medium"), + EstimatedHours = GetNullableDecimalValue(data, "estimatedHours"), + AssigneeId = GetNullableGuidValue(data, "assigneeId"), + CreatedBy = GetGuidValue(data, "createdBy") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Task created: {result.Id} - {result.Title}"; + } + + private async Task ExecuteUpdateTaskAsync(DiffPreview diff, CancellationToken cancellationToken) + { + var data = JsonSerializer.Deserialize>(diff.AfterData ?? "{}"); + if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateTask"); + + var command = new UpdateTaskCommand + { + TaskId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"), + Title = GetStringValue(data, "title"), + Description = GetStringValue(data, "description"), + Status = GetStringValue(data, "status"), + Priority = GetStringValue(data, "priority") + }; + + var result = await _mediator.Send(command, cancellationToken); + return $"Task updated: {result.Id} - {result.Title}"; + } + + #endregion + + #region Helper Methods + + private static string GetStringValue(Dictionary data, string key, string? defaultValue = null) + { + if (data.TryGetValue(key, out var element) && element.ValueKind == JsonValueKind.String) + { + return element.GetString() ?? defaultValue ?? string.Empty; + } + return defaultValue ?? string.Empty; + } + + private static Guid GetGuidValue(Dictionary data, string key) + { + if (data.TryGetValue(key, out var element)) + { + if (element.ValueKind == JsonValueKind.String) + { + var stringValue = element.GetString(); + if (Guid.TryParse(stringValue, out var guid)) + { + return guid; + } + } + } + throw new InvalidOperationException($"Required Guid field '{key}' is missing or invalid"); + } + + private static Guid? GetNullableGuidValue(Dictionary data, string key) + { + if (data.TryGetValue(key, out var element)) + { + if (element.ValueKind == JsonValueKind.String) + { + var stringValue = element.GetString(); + if (!string.IsNullOrWhiteSpace(stringValue) && Guid.TryParse(stringValue, out var guid)) + { + return guid; + } + } + } + return null; + } + + private static decimal? GetNullableDecimalValue(Dictionary data, string key) + { + if (data.TryGetValue(key, out var element)) + { + if (element.ValueKind == JsonValueKind.Number) + { + return element.GetDecimal(); + } + if (element.ValueKind == JsonValueKind.String) + { + var stringValue = element.GetString(); + if (decimal.TryParse(stringValue, out var decimalValue)) + { + return decimalValue; + } + } + } + return null; + } + + #endregion +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IPendingChangeService.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IPendingChangeService.cs new file mode 100644 index 0000000..ca78b0f --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/IPendingChangeService.cs @@ -0,0 +1,68 @@ +using ColaFlow.Modules.Mcp.Application.DTOs; +using ColaFlow.Modules.Mcp.Domain.ValueObjects; + +namespace ColaFlow.Modules.Mcp.Application.Services; + +/// +/// Service interface for PendingChange management +/// +public interface IPendingChangeService +{ + /// + /// Create a new PendingChange + /// + Task CreateAsync( + CreatePendingChangeRequest request, + CancellationToken cancellationToken = default); + + /// + /// Get a PendingChange by ID + /// + Task GetByIdAsync( + Guid id, + CancellationToken cancellationToken = default); + + /// + /// Get list of PendingChanges with filtering and pagination + /// + Task<(List Items, int TotalCount)> GetPendingChangesAsync( + PendingChangeFilterDto filter, + CancellationToken cancellationToken = default); + + /// + /// Approve a PendingChange (triggers execution) + /// + Task ApproveAsync( + Guid pendingChangeId, + Guid approvedBy, + CancellationToken cancellationToken = default); + + /// + /// Reject a PendingChange + /// + Task RejectAsync( + Guid pendingChangeId, + Guid rejectedBy, + string reason, + CancellationToken cancellationToken = default); + + /// + /// Mark a PendingChange as applied (called after successful execution) + /// + Task MarkAsAppliedAsync( + Guid pendingChangeId, + string result, + CancellationToken cancellationToken = default); + + /// + /// Expire old PendingChanges (called by background job) + /// + Task ExpireOldChangesAsync(CancellationToken cancellationToken = default); + + /// + /// Delete a PendingChange (only allowed for Expired status) + /// + Task DeleteAsync( + Guid pendingChangeId, + CancellationToken cancellationToken = default); +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/PendingChangeService.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/PendingChangeService.cs new file mode 100644 index 0000000..831ddd0 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Application/Services/PendingChangeService.cs @@ -0,0 +1,410 @@ +using ColaFlow.Modules.Mcp.Application.DTOs; +using ColaFlow.Modules.Mcp.Domain.Entities; +using ColaFlow.Modules.Mcp.Domain.Exceptions; +using ColaFlow.Modules.Mcp.Domain.Repositories; +using ColaFlow.Modules.Mcp.Domain.ValueObjects; +using ColaFlow.Modules.ProjectManagement.Application.Common.Interfaces; +using MediatR; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Application.Services; + +/// +/// Service implementation for PendingChange management +/// +public class PendingChangeService : IPendingChangeService +{ + private readonly IPendingChangeRepository _repository; + private readonly ITenantContext _tenantContext; + private readonly IHttpContextAccessor _httpContextAccessor; + private readonly IPublisher _publisher; + private readonly ILogger _logger; + + public PendingChangeService( + IPendingChangeRepository repository, + ITenantContext tenantContext, + IHttpContextAccessor httpContextAccessor, + IPublisher publisher, + ILogger logger) + { + _repository = repository ?? throw new ArgumentNullException(nameof(repository)); + _tenantContext = tenantContext ?? throw new ArgumentNullException(nameof(tenantContext)); + _httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor)); + _publisher = publisher ?? throw new ArgumentNullException(nameof(publisher)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task CreateAsync( + CreatePendingChangeRequest request, + CancellationToken cancellationToken = default) + { + var tenantId = _tenantContext.GetCurrentTenantId(); + + // Get API Key ID from HttpContext (set by MCP authentication middleware) + var apiKeyIdNullable = _httpContextAccessor.HttpContext?.Items["ApiKeyId"] as Guid?; + if (!apiKeyIdNullable.HasValue) + { + throw new McpUnauthorizedException("API Key not found in request context"); + } + var apiKeyId = apiKeyIdNullable.Value; + + _logger.LogInformation( + "Creating PendingChange: Tool={ToolName}, Operation={Operation}, EntityType={EntityType}, Tenant={TenantId}", + request.ToolName, request.Diff.Operation, request.Diff.EntityType, tenantId); + + // Create PendingChange entity + var pendingChange = PendingChange.Create( + request.ToolName, + request.Diff, + tenantId, + apiKeyId, + request.ExpirationHours); + + // Save to database + await _repository.AddAsync(pendingChange, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + // Publish domain events + foreach (var domainEvent in pendingChange.DomainEvents) + { + await _publisher.Publish(domainEvent, cancellationToken); + } + pendingChange.ClearDomainEvents(); + + _logger.LogInformation( + "PendingChange created: {Id}, ExpiresAt={ExpiresAt}", + pendingChange.Id, pendingChange.ExpiresAt); + + return MapToDto(pendingChange); + } + + public async Task GetByIdAsync( + Guid id, + CancellationToken cancellationToken = default) + { + var tenantId = _tenantContext.GetCurrentTenantId(); + + var pendingChange = await _repository.GetByIdAsync(id, cancellationToken); + if (pendingChange == null) + { + return null; + } + + // Verify tenant isolation + if (pendingChange.TenantId != tenantId) + { + _logger.LogWarning( + "Attempted cross-tenant access: PendingChange {Id} belongs to Tenant {OwnerId}, but requested by Tenant {RequesterId}", + id, pendingChange.TenantId, tenantId); + return null; + } + + return MapToDto(pendingChange); + } + + public async Task<(List Items, int TotalCount)> GetPendingChangesAsync( + PendingChangeFilterDto filter, + CancellationToken cancellationToken = default) + { + var tenantId = _tenantContext.GetCurrentTenantId(); + + // Build query + var query = (await _repository.GetByTenantAsync(tenantId, cancellationToken)) + .AsEnumerable(); + + // Apply filters + if (filter.Status.HasValue) + { + query = query.Where(x => x.Status == filter.Status.Value); + } + + if (!string.IsNullOrWhiteSpace(filter.EntityType)) + { + query = query.Where(x => x.Diff.EntityType == filter.EntityType); + } + + if (filter.EntityId.HasValue) + { + query = query.Where(x => x.Diff.EntityId == filter.EntityId.Value); + } + + if (filter.ApiKeyId.HasValue) + { + query = query.Where(x => x.ApiKeyId == filter.ApiKeyId.Value); + } + + if (!string.IsNullOrWhiteSpace(filter.ToolName)) + { + query = query.Where(x => x.ToolName == filter.ToolName); + } + + if (filter.IncludeExpired == false) + { + query = query.Where(x => x.Status != PendingChangeStatus.Expired); + } + + // Get total count before pagination + var totalCount = query.Count(); + + // Apply pagination + var items = query + .OrderByDescending(x => x.CreatedAt) + .Skip((filter.Page - 1) * filter.PageSize) + .Take(filter.PageSize) + .Select(MapToDto) + .ToList(); + + _logger.LogInformation( + "Retrieved {Count}/{Total} PendingChanges for Tenant {TenantId} (Page {Page}/{PageSize})", + items.Count, totalCount, tenantId, filter.Page, filter.PageSize); + + return (items, totalCount); + } + + public async Task ApproveAsync( + Guid pendingChangeId, + Guid approvedBy, + CancellationToken cancellationToken = default) + { + var tenantId = _tenantContext.GetCurrentTenantId(); + + var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken); + if (pendingChange == null) + { + throw new McpNotFoundException("PendingChange", pendingChangeId.ToString()); + } + + // Verify tenant isolation + if (pendingChange.TenantId != tenantId) + { + throw new McpForbiddenException( + $"Cannot approve PendingChange from different tenant"); + } + + _logger.LogInformation( + "Approving PendingChange {Id} by User {UserId}", + pendingChangeId, approvedBy); + + // Domain method validates business rules and raises PendingChangeApprovedEvent + pendingChange.Approve(approvedBy); + + await _repository.UpdateAsync(pendingChange, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + // Publish domain events (will trigger operation execution) + foreach (var domainEvent in pendingChange.DomainEvents) + { + await _publisher.Publish(domainEvent, cancellationToken); + } + pendingChange.ClearDomainEvents(); + + _logger.LogInformation( + "PendingChange {Id} approved successfully", + pendingChangeId); + } + + public async Task RejectAsync( + Guid pendingChangeId, + Guid rejectedBy, + string reason, + CancellationToken cancellationToken = default) + { + var tenantId = _tenantContext.GetCurrentTenantId(); + + var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken); + if (pendingChange == null) + { + throw new McpNotFoundException("PendingChange", pendingChangeId.ToString()); + } + + // Verify tenant isolation + if (pendingChange.TenantId != tenantId) + { + throw new McpForbiddenException( + $"Cannot reject PendingChange from different tenant"); + } + + _logger.LogInformation( + "Rejecting PendingChange {Id} by User {UserId} - Reason: {Reason}", + pendingChangeId, rejectedBy, reason); + + // Domain method validates business rules and raises PendingChangeRejectedEvent + pendingChange.Reject(rejectedBy, reason); + + await _repository.UpdateAsync(pendingChange, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + // Publish domain events + foreach (var domainEvent in pendingChange.DomainEvents) + { + await _publisher.Publish(domainEvent, cancellationToken); + } + pendingChange.ClearDomainEvents(); + + _logger.LogInformation( + "PendingChange {Id} rejected successfully", + pendingChangeId); + } + + public async Task MarkAsAppliedAsync( + Guid pendingChangeId, + string result, + CancellationToken cancellationToken = default) + { + var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken); + if (pendingChange == null) + { + throw new McpNotFoundException("PendingChange", pendingChangeId.ToString()); + } + + _logger.LogInformation( + "Marking PendingChange {Id} as Applied - Result: {Result}", + pendingChangeId, result); + + // Domain method validates business rules and raises PendingChangeAppliedEvent + pendingChange.MarkAsApplied(result); + + await _repository.UpdateAsync(pendingChange, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + // Publish domain events + foreach (var domainEvent in pendingChange.DomainEvents) + { + await _publisher.Publish(domainEvent, cancellationToken); + } + pendingChange.ClearDomainEvents(); + + _logger.LogInformation( + "PendingChange {Id} marked as Applied", + pendingChangeId); + } + + public async Task ExpireOldChangesAsync(CancellationToken cancellationToken = default) + { + _logger.LogInformation("Starting expiration check for old PendingChanges"); + + var expiredChanges = await _repository.GetExpiredAsync(cancellationToken); + + var count = 0; + foreach (var change in expiredChanges) + { + try + { + change.Expire(); + await _repository.UpdateAsync(change, cancellationToken); + + // Publish domain events + foreach (var domainEvent in change.DomainEvents) + { + await _publisher.Publish(domainEvent, cancellationToken); + } + change.ClearDomainEvents(); + + count++; + + _logger.LogWarning( + "PendingChange expired: {Id} - {ToolName} {Operation} {EntityType}", + change.Id, change.ToolName, change.Diff.Operation, change.Diff.EntityType); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to expire PendingChange {Id}", + change.Id); + } + } + + if (count > 0) + { + await _repository.SaveChangesAsync(cancellationToken); + } + + _logger.LogInformation( + "Expired {Count} PendingChanges", + count); + + return count; + } + + public async Task DeleteAsync( + Guid pendingChangeId, + CancellationToken cancellationToken = default) + { + var tenantId = _tenantContext.GetCurrentTenantId(); + + var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken); + if (pendingChange == null) + { + throw new McpNotFoundException("PendingChange", pendingChangeId.ToString()); + } + + // Verify tenant isolation + if (pendingChange.TenantId != tenantId) + { + throw new McpForbiddenException( + $"Cannot delete PendingChange from different tenant"); + } + + // Only allow deletion of Expired or Rejected changes + if (pendingChange.Status != PendingChangeStatus.Expired && + pendingChange.Status != PendingChangeStatus.Rejected) + { + throw new McpValidationException( + $"Can only delete PendingChanges with Expired or Rejected status. Current status: {pendingChange.Status}"); + } + + _logger.LogInformation( + "Deleting PendingChange {Id} (Status: {Status})", + pendingChangeId, pendingChange.Status); + + await _repository.DeleteAsync(pendingChange, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + _logger.LogInformation( + "PendingChange {Id} deleted successfully", + pendingChangeId); + } + + private static PendingChangeDto MapToDto(PendingChange pendingChange) + { + return new PendingChangeDto + { + Id = pendingChange.Id, + TenantId = pendingChange.TenantId, + ApiKeyId = pendingChange.ApiKeyId, + ToolName = pendingChange.ToolName, + Diff = new DiffPreviewDto + { + Operation = pendingChange.Diff.Operation, + EntityType = pendingChange.Diff.EntityType, + EntityId = pendingChange.Diff.EntityId, + EntityKey = pendingChange.Diff.EntityKey, + BeforeData = pendingChange.Diff.BeforeData, + AfterData = pendingChange.Diff.AfterData, + ChangedFields = pendingChange.Diff.ChangedFields.Select(f => new DiffFieldDto + { + FieldName = f.FieldName, + DisplayName = f.DisplayName, + OldValue = f.OldValue, + NewValue = f.NewValue, + DiffHtml = f.DiffHtml + }).ToList() + }, + Status = pendingChange.Status.ToString(), + CreatedAt = pendingChange.CreatedAt, + ExpiresAt = pendingChange.ExpiresAt, + ApprovedBy = pendingChange.ApprovedBy, + ApprovedAt = pendingChange.ApprovedAt, + RejectedBy = pendingChange.RejectedBy, + RejectedAt = pendingChange.RejectedAt, + RejectionReason = pendingChange.RejectionReason, + AppliedAt = pendingChange.AppliedAt, + ApplicationResult = pendingChange.ApplicationResult, + IsExpired = pendingChange.IsExpired(), + CanBeApproved = pendingChange.CanBeApproved(), + CanBeRejected = pendingChange.CanBeRejected(), + Summary = pendingChange.GetSummary() + }; + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/BackgroundServices/PendingChangeExpirationBackgroundService.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/BackgroundServices/PendingChangeExpirationBackgroundService.cs new file mode 100644 index 0000000..55f8432 --- /dev/null +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/BackgroundServices/PendingChangeExpirationBackgroundService.cs @@ -0,0 +1,71 @@ +using ColaFlow.Modules.Mcp.Application.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace ColaFlow.Modules.Mcp.Infrastructure.BackgroundServices; + +/// +/// Background service to periodically expire old PendingChanges +/// Runs every 5 minutes and marks expired changes +/// +public class PendingChangeExpirationBackgroundService : BackgroundService +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly TimeSpan _interval = TimeSpan.FromMinutes(5); + + public PendingChangeExpirationBackgroundService( + IServiceProvider serviceProvider, + ILogger logger) + { + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("PendingChange Expiration Background Service started"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + _logger.LogDebug("Running PendingChange expiration check..."); + + // Create a new scope for the scoped service + using var scope = _serviceProvider.CreateScope(); + var pendingChangeService = scope.ServiceProvider + .GetRequiredService(); + + var expiredCount = await pendingChangeService.ExpireOldChangesAsync(stoppingToken); + + if (expiredCount > 0) + { + _logger.LogInformation("Expired {Count} PendingChanges", expiredCount); + } + else + { + _logger.LogDebug("No expired PendingChanges found"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error occurred while expiring PendingChanges"); + } + + // Wait for the next interval + try + { + await Task.Delay(_interval, stoppingToken); + } + catch (TaskCanceledException) + { + // Expected when cancellation is requested + break; + } + } + + _logger.LogInformation("PendingChange Expiration Background Service stopped"); + } +} diff --git a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs index 158ff07..bb92a3d 100644 --- a/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs +++ b/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Infrastructure/Extensions/McpServiceExtensions.cs @@ -3,6 +3,7 @@ using ColaFlow.Modules.Mcp.Application.Resources; using ColaFlow.Modules.Mcp.Application.Services; using ColaFlow.Modules.Mcp.Contracts.Resources; using ColaFlow.Modules.Mcp.Domain.Repositories; +using ColaFlow.Modules.Mcp.Infrastructure.BackgroundServices; using ColaFlow.Modules.Mcp.Infrastructure.Middleware; using ColaFlow.Modules.Mcp.Infrastructure.Persistence; using ColaFlow.Modules.Mcp.Infrastructure.Persistence.Repositories; @@ -41,6 +42,10 @@ public static class McpServiceExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); + + // Register background services + services.AddHostedService(); // Register resource registry (Singleton - shared across all requests) services.AddSingleton();