feat(backend): Implement PendingChange Management (Story 5.10)

Implemented complete Human-in-the-Loop approval workflow for AI-proposed changes:

Changes:
- Created PendingChange DTOs (PendingChangeDto, CreatePendingChangeRequest, ApproveChangeRequest, RejectChangeRequest, PendingChangeFilterDto)
- Implemented IPendingChangeService interface with CRUD, approval/rejection, expiration, and deletion operations
- Implemented PendingChangeService with full workflow support and tenant isolation
- Created McpPendingChangesController REST API with endpoints for listing, approving, rejecting, and deleting pending changes
- Implemented PendingChangeApprovedEventHandler to execute approved changes via MediatR commands (Project, Epic, Story, Task CRUD operations)
- Created PendingChangeExpirationBackgroundService for auto-expiration of changes after 24 hours
- Registered all services and background service in DI container

Technical Details:
- Status flow: PendingApproval → Approved → Applied (or Rejected/Expired)
- Tenant isolation enforced in all operations
- Domain events published for audit trail
- Event-driven execution using MediatR
- Background service runs every 5 minutes to expire old changes
- JWT authentication required for all endpoints

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Yaojia Wang
2025-11-09 17:58:12 +01:00
parent debfb95780
commit 2fec2df004
11 changed files with 1182 additions and 0 deletions

View File

@@ -0,0 +1,229 @@
using ColaFlow.Modules.Mcp.Application.DTOs;
using ColaFlow.Modules.Mcp.Application.Services;
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Security.Claims;
namespace ColaFlow.API.Controllers;
/// <summary>
/// Controller for managing PendingChanges (AI-proposed changes awaiting approval)
/// Requires JWT authentication
/// </summary>
[ApiController]
[Route("api/mcp/pending-changes")]
[Authorize] // Requires JWT authentication
public class McpPendingChangesController : ControllerBase
{
private readonly IPendingChangeService _pendingChangeService;
private readonly ILogger<McpPendingChangesController> _logger;
public McpPendingChangesController(
IPendingChangeService pendingChangeService,
ILogger<McpPendingChangesController> logger)
{
_pendingChangeService = pendingChangeService ?? throw new ArgumentNullException(nameof(pendingChangeService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Get list of pending changes with filtering and pagination
/// </summary>
[HttpGet]
[ProducesResponseType(typeof(PendingChangeListResponse), 200)]
[ProducesResponseType(401)]
public async Task<IActionResult> GetPendingChanges(
[FromQuery] string? status = null,
[FromQuery] string? entityType = null,
[FromQuery] Guid? entityId = null,
[FromQuery] Guid? apiKeyId = null,
[FromQuery] string? toolName = null,
[FromQuery] bool? includeExpired = null,
[FromQuery] int page = 1,
[FromQuery] int pageSize = 20)
{
try
{
var filter = new PendingChangeFilterDto
{
Status = string.IsNullOrWhiteSpace(status) ? null : Enum.Parse<PendingChangeStatus>(status, true),
EntityType = entityType,
EntityId = entityId,
ApiKeyId = apiKeyId,
ToolName = toolName,
IncludeExpired = includeExpired,
Page = page,
PageSize = Math.Min(pageSize, 100) // Max 100 items per page
};
var (items, totalCount) = await _pendingChangeService.GetPendingChangesAsync(filter, HttpContext.RequestAborted);
var response = new PendingChangeListResponse
{
Items = items,
TotalCount = totalCount,
Page = page,
PageSize = pageSize,
TotalPages = (int)Math.Ceiling((double)totalCount / pageSize)
};
return Ok(response);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to get pending changes");
return StatusCode(500, new { message = "Failed to retrieve pending changes" });
}
}
/// <summary>
/// Get a specific pending change by ID
/// </summary>
[HttpGet("{id}")]
[ProducesResponseType(typeof(PendingChangeDto), 200)]
[ProducesResponseType(404)]
[ProducesResponseType(401)]
public async Task<IActionResult> GetPendingChange(Guid id)
{
try
{
var pendingChange = await _pendingChangeService.GetByIdAsync(id, HttpContext.RequestAborted);
if (pendingChange == null)
{
return NotFound(new { message = "PendingChange not found" });
}
return Ok(pendingChange);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to get pending change {Id}", id);
return StatusCode(500, new { message = "Failed to retrieve pending change" });
}
}
/// <summary>
/// Approve a pending change (will trigger execution)
/// </summary>
[HttpPost("{id}/approve")]
[ProducesResponseType(200)]
[ProducesResponseType(400)]
[ProducesResponseType(404)]
[ProducesResponseType(401)]
public async Task<IActionResult> ApprovePendingChange(Guid id)
{
try
{
// Extract user ID from JWT claims
var userId = GetUserIdFromClaims();
await _pendingChangeService.ApproveAsync(id, userId, HttpContext.RequestAborted);
_logger.LogInformation("PendingChange {Id} approved by User {UserId}", id, userId);
return Ok(new { message = "PendingChange approved successfully. Execution in progress." });
}
catch (InvalidOperationException ex)
{
_logger.LogWarning(ex, "Cannot approve PendingChange {Id}", id);
return BadRequest(new { message = ex.Message });
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to approve PendingChange {Id}", id);
return StatusCode(500, new { message = "Failed to approve pending change" });
}
}
/// <summary>
/// Reject a pending change
/// </summary>
[HttpPost("{id}/reject")]
[ProducesResponseType(200)]
[ProducesResponseType(400)]
[ProducesResponseType(404)]
[ProducesResponseType(401)]
public async Task<IActionResult> RejectPendingChange(Guid id, [FromBody] RejectChangeRequest request)
{
try
{
if (string.IsNullOrWhiteSpace(request.Reason))
{
return BadRequest(new { message = "Rejection reason is required" });
}
// Extract user ID from JWT claims
var userId = GetUserIdFromClaims();
await _pendingChangeService.RejectAsync(id, userId, request.Reason, HttpContext.RequestAborted);
_logger.LogInformation("PendingChange {Id} rejected by User {UserId}", id, userId);
return Ok(new { message = "PendingChange rejected successfully" });
}
catch (InvalidOperationException ex)
{
_logger.LogWarning(ex, "Cannot reject PendingChange {Id}", id);
return BadRequest(new { message = ex.Message });
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reject PendingChange {Id}", id);
return StatusCode(500, new { message = "Failed to reject pending change" });
}
}
/// <summary>
/// Delete a pending change (only allowed for Expired or Rejected status)
/// </summary>
[HttpDelete("{id}")]
[ProducesResponseType(204)]
[ProducesResponseType(400)]
[ProducesResponseType(404)]
[ProducesResponseType(401)]
public async Task<IActionResult> DeletePendingChange(Guid id)
{
try
{
await _pendingChangeService.DeleteAsync(id, HttpContext.RequestAborted);
_logger.LogInformation("PendingChange {Id} deleted", id);
return NoContent();
}
catch (InvalidOperationException ex)
{
_logger.LogWarning(ex, "Cannot delete PendingChange {Id}", id);
return BadRequest(new { message = ex.Message });
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to delete PendingChange {Id}", id);
return StatusCode(500, new { message = "Failed to delete pending change" });
}
}
// Helper method to extract user ID from claims
private Guid GetUserIdFromClaims()
{
var userIdClaim = User.FindFirstValue("user_id")
?? User.FindFirstValue(ClaimTypes.NameIdentifier)
?? User.FindFirstValue("sub")
?? throw new UnauthorizedAccessException("User ID not found in token");
return Guid.Parse(userIdClaim);
}
}
/// <summary>
/// Response for paginated list of pending changes
/// </summary>
public class PendingChangeListResponse
{
public List<PendingChangeDto> Items { get; set; } = new();
public int TotalCount { get; set; }
public int Page { get; set; }
public int PageSize { get; set; }
public int TotalPages { get; set; }
}

View File

@@ -0,0 +1,12 @@
namespace ColaFlow.Modules.Mcp.Application.DTOs;
/// <summary>
/// Request to approve a PendingChange
/// </summary>
public class ApproveChangeRequest
{
// Currently empty, but we may add fields later like:
// - ApprovalComments
// - AutoApply flag
// - ScheduledExecutionTime
}

View File

@@ -0,0 +1,13 @@
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
namespace ColaFlow.Modules.Mcp.Application.DTOs;
/// <summary>
/// Request to create a new PendingChange
/// </summary>
public class CreatePendingChangeRequest
{
public string ToolName { get; set; } = null!;
public DiffPreview Diff { get; set; } = null!;
public int ExpirationHours { get; set; } = 24;
}

View File

@@ -0,0 +1,29 @@
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
namespace ColaFlow.Modules.Mcp.Application.DTOs;
/// <summary>
/// DTO for PendingChange response
/// </summary>
public class PendingChangeDto
{
public Guid Id { get; set; }
public Guid TenantId { get; set; }
public Guid ApiKeyId { get; set; }
public string ToolName { get; set; } = null!;
public DiffPreviewDto Diff { get; set; } = null!;
public string Status { get; set; } = null!;
public DateTime CreatedAt { get; set; }
public DateTime ExpiresAt { get; set; }
public Guid? ApprovedBy { get; set; }
public DateTime? ApprovedAt { get; set; }
public Guid? RejectedBy { get; set; }
public DateTime? RejectedAt { get; set; }
public string? RejectionReason { get; set; }
public DateTime? AppliedAt { get; set; }
public string? ApplicationResult { get; set; }
public bool IsExpired { get; set; }
public bool CanBeApproved { get; set; }
public bool CanBeRejected { get; set; }
public string Summary { get; set; } = null!;
}

View File

@@ -0,0 +1,18 @@
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
namespace ColaFlow.Modules.Mcp.Application.DTOs;
/// <summary>
/// Filter options for querying PendingChanges
/// </summary>
public class PendingChangeFilterDto
{
public PendingChangeStatus? Status { get; set; }
public string? EntityType { get; set; }
public Guid? EntityId { get; set; }
public Guid? ApiKeyId { get; set; }
public string? ToolName { get; set; }
public bool? IncludeExpired { get; set; }
public int Page { get; set; } = 1;
public int PageSize { get; set; } = 20;
}

View File

@@ -0,0 +1,9 @@
namespace ColaFlow.Modules.Mcp.Application.DTOs;
/// <summary>
/// Request to reject a PendingChange
/// </summary>
public class RejectChangeRequest
{
public string Reason { get; set; } = null!;
}

View File

@@ -0,0 +1,318 @@
using ColaFlow.Modules.Mcp.Application.Services;
using ColaFlow.Modules.Mcp.Domain.Events;
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateEpic;
using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateProject;
using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateStory;
using ColaFlow.Modules.ProjectManagement.Application.Commands.CreateTask;
using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateEpic;
using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateProject;
using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateStory;
using ColaFlow.Modules.ProjectManagement.Application.Commands.UpdateTask;
using MediatR;
using Microsoft.Extensions.Logging;
using System.Text.Json;
namespace ColaFlow.Modules.Mcp.Application.EventHandlers;
/// <summary>
/// Event handler for PendingChangeApprovedEvent
/// Executes the approved change by dispatching appropriate commands
/// </summary>
public class PendingChangeApprovedEventHandler : INotificationHandler<PendingChangeApprovedEvent>
{
private readonly IMediator _mediator;
private readonly IPendingChangeService _pendingChangeService;
private readonly ILogger<PendingChangeApprovedEventHandler> _logger;
public PendingChangeApprovedEventHandler(
IMediator mediator,
IPendingChangeService pendingChangeService,
ILogger<PendingChangeApprovedEventHandler> logger)
{
_mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
_pendingChangeService = pendingChangeService ?? throw new ArgumentNullException(nameof(pendingChangeService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task Handle(PendingChangeApprovedEvent notification, CancellationToken cancellationToken)
{
_logger.LogInformation(
"Handling PendingChangeApprovedEvent - PendingChangeId={PendingChangeId}, EntityType={EntityType}, Operation={Operation}",
notification.PendingChangeId, notification.Diff.EntityType, notification.Diff.Operation);
try
{
// Execute the change based on entity type and operation
var result = await ExecuteChangeAsync(notification.Diff, cancellationToken);
// Mark as applied
await _pendingChangeService.MarkAsAppliedAsync(
notification.PendingChangeId,
result,
cancellationToken);
_logger.LogInformation(
"PendingChange executed successfully - PendingChangeId={PendingChangeId}, Result={Result}",
notification.PendingChangeId, result);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to execute PendingChange - PendingChangeId={PendingChangeId}",
notification.PendingChangeId);
// Mark as failed (store error in ApplicationResult)
await _pendingChangeService.MarkAsAppliedAsync(
notification.PendingChangeId,
$"Failed: {ex.Message}",
cancellationToken);
}
}
private async Task<string> ExecuteChangeAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var operation = diff.Operation.ToLowerInvariant();
var entityType = diff.EntityType.ToLowerInvariant();
_logger.LogDebug(
"Executing {Operation} on {EntityType}",
operation, entityType);
return (operation, entityType) switch
{
("create", "project") => await ExecuteCreateProjectAsync(diff, cancellationToken),
("update", "project") => await ExecuteUpdateProjectAsync(diff, cancellationToken),
("create", "epic") => await ExecuteCreateEpicAsync(diff, cancellationToken),
("update", "epic") => await ExecuteUpdateEpicAsync(diff, cancellationToken),
("create", "story") => await ExecuteCreateStoryAsync(diff, cancellationToken),
("update", "story") => await ExecuteUpdateStoryAsync(diff, cancellationToken),
("create", "task") => await ExecuteCreateTaskAsync(diff, cancellationToken),
("update", "task") => await ExecuteUpdateTaskAsync(diff, cancellationToken),
_ => throw new NotSupportedException($"Operation '{operation}' on entity type '{entityType}' is not supported")
};
}
#region Project Operations
private async Task<string> ExecuteCreateProjectAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateProject");
var command = new CreateProjectCommand
{
Name = GetStringValue(data, "name", "New Project"),
Description = GetStringValue(data, "description", ""),
Key = GetStringValue(data, "key", "PROJ"),
OwnerId = GetGuidValue(data, "ownerId")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Project created: {result.Id} - {result.Name}";
}
private async Task<string> ExecuteUpdateProjectAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateProject");
var command = new UpdateProjectCommand
{
ProjectId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"),
Name = GetStringValue(data, "name"),
Description = GetStringValue(data, "description"),
};
var result = await _mediator.Send(command, cancellationToken);
return $"Project updated: {result.Id} - {result.Name}";
}
#endregion
#region Epic Operations
private async Task<string> ExecuteCreateEpicAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateEpic");
var command = new CreateEpicCommand
{
ProjectId = GetGuidValue(data, "projectId"),
Name = GetStringValue(data, "name", "New Epic"),
Description = GetStringValue(data, "description", ""),
CreatedBy = GetGuidValue(data, "createdBy")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Epic created: {result.Id} - {result.Name}";
}
private async Task<string> ExecuteUpdateEpicAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateEpic");
var command = new UpdateEpicCommand
{
EpicId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"),
Name = GetStringValue(data, "name"),
Description = GetStringValue(data, "description")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Epic updated: {result.Id} - {result.Name}";
}
#endregion
#region Story Operations
private async Task<string> ExecuteCreateStoryAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateStory");
var command = new CreateStoryCommand
{
EpicId = GetGuidValue(data, "epicId"),
Title = GetStringValue(data, "title", "New Story"),
Description = GetStringValue(data, "description", ""),
Priority = GetStringValue(data, "priority", "Medium"),
AssigneeId = GetNullableGuidValue(data, "assigneeId"),
EstimatedHours = GetNullableDecimalValue(data, "estimatedHours"),
CreatedBy = GetGuidValue(data, "createdBy")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Story created: {result.Id} - {result.Title}";
}
private async Task<string> ExecuteUpdateStoryAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateStory");
var command = new UpdateStoryCommand
{
StoryId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"),
Title = GetStringValue(data, "title"),
Description = GetStringValue(data, "description"),
Status = GetStringValue(data, "status"),
Priority = GetStringValue(data, "priority")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Story updated: {result.Id} - {result.Title}";
}
#endregion
#region Task Operations
private async Task<string> ExecuteCreateTaskAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for CreateTask");
var command = new CreateTaskCommand
{
StoryId = GetGuidValue(data, "storyId"),
Title = GetStringValue(data, "title", "New Task"),
Description = GetStringValue(data, "description", ""),
Priority = GetStringValue(data, "priority", "Medium"),
EstimatedHours = GetNullableDecimalValue(data, "estimatedHours"),
AssigneeId = GetNullableGuidValue(data, "assigneeId"),
CreatedBy = GetGuidValue(data, "createdBy")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Task created: {result.Id} - {result.Title}";
}
private async Task<string> ExecuteUpdateTaskAsync(DiffPreview diff, CancellationToken cancellationToken)
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(diff.AfterData ?? "{}");
if (data == null) throw new InvalidOperationException("Invalid AfterData for UpdateTask");
var command = new UpdateTaskCommand
{
TaskId = diff.EntityId ?? throw new InvalidOperationException("EntityId is required for Update"),
Title = GetStringValue(data, "title"),
Description = GetStringValue(data, "description"),
Status = GetStringValue(data, "status"),
Priority = GetStringValue(data, "priority")
};
var result = await _mediator.Send(command, cancellationToken);
return $"Task updated: {result.Id} - {result.Title}";
}
#endregion
#region Helper Methods
private static string GetStringValue(Dictionary<string, JsonElement> data, string key, string? defaultValue = null)
{
if (data.TryGetValue(key, out var element) && element.ValueKind == JsonValueKind.String)
{
return element.GetString() ?? defaultValue ?? string.Empty;
}
return defaultValue ?? string.Empty;
}
private static Guid GetGuidValue(Dictionary<string, JsonElement> data, string key)
{
if (data.TryGetValue(key, out var element))
{
if (element.ValueKind == JsonValueKind.String)
{
var stringValue = element.GetString();
if (Guid.TryParse(stringValue, out var guid))
{
return guid;
}
}
}
throw new InvalidOperationException($"Required Guid field '{key}' is missing or invalid");
}
private static Guid? GetNullableGuidValue(Dictionary<string, JsonElement> data, string key)
{
if (data.TryGetValue(key, out var element))
{
if (element.ValueKind == JsonValueKind.String)
{
var stringValue = element.GetString();
if (!string.IsNullOrWhiteSpace(stringValue) && Guid.TryParse(stringValue, out var guid))
{
return guid;
}
}
}
return null;
}
private static decimal? GetNullableDecimalValue(Dictionary<string, JsonElement> data, string key)
{
if (data.TryGetValue(key, out var element))
{
if (element.ValueKind == JsonValueKind.Number)
{
return element.GetDecimal();
}
if (element.ValueKind == JsonValueKind.String)
{
var stringValue = element.GetString();
if (decimal.TryParse(stringValue, out var decimalValue))
{
return decimalValue;
}
}
}
return null;
}
#endregion
}

View File

@@ -0,0 +1,68 @@
using ColaFlow.Modules.Mcp.Application.DTOs;
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
namespace ColaFlow.Modules.Mcp.Application.Services;
/// <summary>
/// Service interface for PendingChange management
/// </summary>
public interface IPendingChangeService
{
/// <summary>
/// Create a new PendingChange
/// </summary>
Task<PendingChangeDto> CreateAsync(
CreatePendingChangeRequest request,
CancellationToken cancellationToken = default);
/// <summary>
/// Get a PendingChange by ID
/// </summary>
Task<PendingChangeDto?> GetByIdAsync(
Guid id,
CancellationToken cancellationToken = default);
/// <summary>
/// Get list of PendingChanges with filtering and pagination
/// </summary>
Task<(List<PendingChangeDto> Items, int TotalCount)> GetPendingChangesAsync(
PendingChangeFilterDto filter,
CancellationToken cancellationToken = default);
/// <summary>
/// Approve a PendingChange (triggers execution)
/// </summary>
Task ApproveAsync(
Guid pendingChangeId,
Guid approvedBy,
CancellationToken cancellationToken = default);
/// <summary>
/// Reject a PendingChange
/// </summary>
Task RejectAsync(
Guid pendingChangeId,
Guid rejectedBy,
string reason,
CancellationToken cancellationToken = default);
/// <summary>
/// Mark a PendingChange as applied (called after successful execution)
/// </summary>
Task MarkAsAppliedAsync(
Guid pendingChangeId,
string result,
CancellationToken cancellationToken = default);
/// <summary>
/// Expire old PendingChanges (called by background job)
/// </summary>
Task<int> ExpireOldChangesAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Delete a PendingChange (only allowed for Expired status)
/// </summary>
Task DeleteAsync(
Guid pendingChangeId,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,410 @@
using ColaFlow.Modules.Mcp.Application.DTOs;
using ColaFlow.Modules.Mcp.Domain.Entities;
using ColaFlow.Modules.Mcp.Domain.Exceptions;
using ColaFlow.Modules.Mcp.Domain.Repositories;
using ColaFlow.Modules.Mcp.Domain.ValueObjects;
using ColaFlow.Modules.ProjectManagement.Application.Common.Interfaces;
using MediatR;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace ColaFlow.Modules.Mcp.Application.Services;
/// <summary>
/// Service implementation for PendingChange management
/// </summary>
public class PendingChangeService : IPendingChangeService
{
private readonly IPendingChangeRepository _repository;
private readonly ITenantContext _tenantContext;
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IPublisher _publisher;
private readonly ILogger<PendingChangeService> _logger;
public PendingChangeService(
IPendingChangeRepository repository,
ITenantContext tenantContext,
IHttpContextAccessor httpContextAccessor,
IPublisher publisher,
ILogger<PendingChangeService> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_tenantContext = tenantContext ?? throw new ArgumentNullException(nameof(tenantContext));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<PendingChangeDto> CreateAsync(
CreatePendingChangeRequest request,
CancellationToken cancellationToken = default)
{
var tenantId = _tenantContext.GetCurrentTenantId();
// Get API Key ID from HttpContext (set by MCP authentication middleware)
var apiKeyIdNullable = _httpContextAccessor.HttpContext?.Items["ApiKeyId"] as Guid?;
if (!apiKeyIdNullable.HasValue)
{
throw new McpUnauthorizedException("API Key not found in request context");
}
var apiKeyId = apiKeyIdNullable.Value;
_logger.LogInformation(
"Creating PendingChange: Tool={ToolName}, Operation={Operation}, EntityType={EntityType}, Tenant={TenantId}",
request.ToolName, request.Diff.Operation, request.Diff.EntityType, tenantId);
// Create PendingChange entity
var pendingChange = PendingChange.Create(
request.ToolName,
request.Diff,
tenantId,
apiKeyId,
request.ExpirationHours);
// Save to database
await _repository.AddAsync(pendingChange, cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
// Publish domain events
foreach (var domainEvent in pendingChange.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
pendingChange.ClearDomainEvents();
_logger.LogInformation(
"PendingChange created: {Id}, ExpiresAt={ExpiresAt}",
pendingChange.Id, pendingChange.ExpiresAt);
return MapToDto(pendingChange);
}
public async Task<PendingChangeDto?> GetByIdAsync(
Guid id,
CancellationToken cancellationToken = default)
{
var tenantId = _tenantContext.GetCurrentTenantId();
var pendingChange = await _repository.GetByIdAsync(id, cancellationToken);
if (pendingChange == null)
{
return null;
}
// Verify tenant isolation
if (pendingChange.TenantId != tenantId)
{
_logger.LogWarning(
"Attempted cross-tenant access: PendingChange {Id} belongs to Tenant {OwnerId}, but requested by Tenant {RequesterId}",
id, pendingChange.TenantId, tenantId);
return null;
}
return MapToDto(pendingChange);
}
public async Task<(List<PendingChangeDto> Items, int TotalCount)> GetPendingChangesAsync(
PendingChangeFilterDto filter,
CancellationToken cancellationToken = default)
{
var tenantId = _tenantContext.GetCurrentTenantId();
// Build query
var query = (await _repository.GetByTenantAsync(tenantId, cancellationToken))
.AsEnumerable();
// Apply filters
if (filter.Status.HasValue)
{
query = query.Where(x => x.Status == filter.Status.Value);
}
if (!string.IsNullOrWhiteSpace(filter.EntityType))
{
query = query.Where(x => x.Diff.EntityType == filter.EntityType);
}
if (filter.EntityId.HasValue)
{
query = query.Where(x => x.Diff.EntityId == filter.EntityId.Value);
}
if (filter.ApiKeyId.HasValue)
{
query = query.Where(x => x.ApiKeyId == filter.ApiKeyId.Value);
}
if (!string.IsNullOrWhiteSpace(filter.ToolName))
{
query = query.Where(x => x.ToolName == filter.ToolName);
}
if (filter.IncludeExpired == false)
{
query = query.Where(x => x.Status != PendingChangeStatus.Expired);
}
// Get total count before pagination
var totalCount = query.Count();
// Apply pagination
var items = query
.OrderByDescending(x => x.CreatedAt)
.Skip((filter.Page - 1) * filter.PageSize)
.Take(filter.PageSize)
.Select(MapToDto)
.ToList();
_logger.LogInformation(
"Retrieved {Count}/{Total} PendingChanges for Tenant {TenantId} (Page {Page}/{PageSize})",
items.Count, totalCount, tenantId, filter.Page, filter.PageSize);
return (items, totalCount);
}
public async Task ApproveAsync(
Guid pendingChangeId,
Guid approvedBy,
CancellationToken cancellationToken = default)
{
var tenantId = _tenantContext.GetCurrentTenantId();
var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken);
if (pendingChange == null)
{
throw new McpNotFoundException("PendingChange", pendingChangeId.ToString());
}
// Verify tenant isolation
if (pendingChange.TenantId != tenantId)
{
throw new McpForbiddenException(
$"Cannot approve PendingChange from different tenant");
}
_logger.LogInformation(
"Approving PendingChange {Id} by User {UserId}",
pendingChangeId, approvedBy);
// Domain method validates business rules and raises PendingChangeApprovedEvent
pendingChange.Approve(approvedBy);
await _repository.UpdateAsync(pendingChange, cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
// Publish domain events (will trigger operation execution)
foreach (var domainEvent in pendingChange.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
pendingChange.ClearDomainEvents();
_logger.LogInformation(
"PendingChange {Id} approved successfully",
pendingChangeId);
}
public async Task RejectAsync(
Guid pendingChangeId,
Guid rejectedBy,
string reason,
CancellationToken cancellationToken = default)
{
var tenantId = _tenantContext.GetCurrentTenantId();
var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken);
if (pendingChange == null)
{
throw new McpNotFoundException("PendingChange", pendingChangeId.ToString());
}
// Verify tenant isolation
if (pendingChange.TenantId != tenantId)
{
throw new McpForbiddenException(
$"Cannot reject PendingChange from different tenant");
}
_logger.LogInformation(
"Rejecting PendingChange {Id} by User {UserId} - Reason: {Reason}",
pendingChangeId, rejectedBy, reason);
// Domain method validates business rules and raises PendingChangeRejectedEvent
pendingChange.Reject(rejectedBy, reason);
await _repository.UpdateAsync(pendingChange, cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
// Publish domain events
foreach (var domainEvent in pendingChange.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
pendingChange.ClearDomainEvents();
_logger.LogInformation(
"PendingChange {Id} rejected successfully",
pendingChangeId);
}
public async Task MarkAsAppliedAsync(
Guid pendingChangeId,
string result,
CancellationToken cancellationToken = default)
{
var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken);
if (pendingChange == null)
{
throw new McpNotFoundException("PendingChange", pendingChangeId.ToString());
}
_logger.LogInformation(
"Marking PendingChange {Id} as Applied - Result: {Result}",
pendingChangeId, result);
// Domain method validates business rules and raises PendingChangeAppliedEvent
pendingChange.MarkAsApplied(result);
await _repository.UpdateAsync(pendingChange, cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
// Publish domain events
foreach (var domainEvent in pendingChange.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
pendingChange.ClearDomainEvents();
_logger.LogInformation(
"PendingChange {Id} marked as Applied",
pendingChangeId);
}
public async Task<int> ExpireOldChangesAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("Starting expiration check for old PendingChanges");
var expiredChanges = await _repository.GetExpiredAsync(cancellationToken);
var count = 0;
foreach (var change in expiredChanges)
{
try
{
change.Expire();
await _repository.UpdateAsync(change, cancellationToken);
// Publish domain events
foreach (var domainEvent in change.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
change.ClearDomainEvents();
count++;
_logger.LogWarning(
"PendingChange expired: {Id} - {ToolName} {Operation} {EntityType}",
change.Id, change.ToolName, change.Diff.Operation, change.Diff.EntityType);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to expire PendingChange {Id}",
change.Id);
}
}
if (count > 0)
{
await _repository.SaveChangesAsync(cancellationToken);
}
_logger.LogInformation(
"Expired {Count} PendingChanges",
count);
return count;
}
public async Task DeleteAsync(
Guid pendingChangeId,
CancellationToken cancellationToken = default)
{
var tenantId = _tenantContext.GetCurrentTenantId();
var pendingChange = await _repository.GetByIdAsync(pendingChangeId, cancellationToken);
if (pendingChange == null)
{
throw new McpNotFoundException("PendingChange", pendingChangeId.ToString());
}
// Verify tenant isolation
if (pendingChange.TenantId != tenantId)
{
throw new McpForbiddenException(
$"Cannot delete PendingChange from different tenant");
}
// Only allow deletion of Expired or Rejected changes
if (pendingChange.Status != PendingChangeStatus.Expired &&
pendingChange.Status != PendingChangeStatus.Rejected)
{
throw new McpValidationException(
$"Can only delete PendingChanges with Expired or Rejected status. Current status: {pendingChange.Status}");
}
_logger.LogInformation(
"Deleting PendingChange {Id} (Status: {Status})",
pendingChangeId, pendingChange.Status);
await _repository.DeleteAsync(pendingChange, cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
_logger.LogInformation(
"PendingChange {Id} deleted successfully",
pendingChangeId);
}
private static PendingChangeDto MapToDto(PendingChange pendingChange)
{
return new PendingChangeDto
{
Id = pendingChange.Id,
TenantId = pendingChange.TenantId,
ApiKeyId = pendingChange.ApiKeyId,
ToolName = pendingChange.ToolName,
Diff = new DiffPreviewDto
{
Operation = pendingChange.Diff.Operation,
EntityType = pendingChange.Diff.EntityType,
EntityId = pendingChange.Diff.EntityId,
EntityKey = pendingChange.Diff.EntityKey,
BeforeData = pendingChange.Diff.BeforeData,
AfterData = pendingChange.Diff.AfterData,
ChangedFields = pendingChange.Diff.ChangedFields.Select(f => new DiffFieldDto
{
FieldName = f.FieldName,
DisplayName = f.DisplayName,
OldValue = f.OldValue,
NewValue = f.NewValue,
DiffHtml = f.DiffHtml
}).ToList()
},
Status = pendingChange.Status.ToString(),
CreatedAt = pendingChange.CreatedAt,
ExpiresAt = pendingChange.ExpiresAt,
ApprovedBy = pendingChange.ApprovedBy,
ApprovedAt = pendingChange.ApprovedAt,
RejectedBy = pendingChange.RejectedBy,
RejectedAt = pendingChange.RejectedAt,
RejectionReason = pendingChange.RejectionReason,
AppliedAt = pendingChange.AppliedAt,
ApplicationResult = pendingChange.ApplicationResult,
IsExpired = pendingChange.IsExpired(),
CanBeApproved = pendingChange.CanBeApproved(),
CanBeRejected = pendingChange.CanBeRejected(),
Summary = pendingChange.GetSummary()
};
}
}

View File

@@ -0,0 +1,71 @@
using ColaFlow.Modules.Mcp.Application.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ColaFlow.Modules.Mcp.Infrastructure.BackgroundServices;
/// <summary>
/// Background service to periodically expire old PendingChanges
/// Runs every 5 minutes and marks expired changes
/// </summary>
public class PendingChangeExpirationBackgroundService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<PendingChangeExpirationBackgroundService> _logger;
private readonly TimeSpan _interval = TimeSpan.FromMinutes(5);
public PendingChangeExpirationBackgroundService(
IServiceProvider serviceProvider,
ILogger<PendingChangeExpirationBackgroundService> logger)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("PendingChange Expiration Background Service started");
while (!stoppingToken.IsCancellationRequested)
{
try
{
_logger.LogDebug("Running PendingChange expiration check...");
// Create a new scope for the scoped service
using var scope = _serviceProvider.CreateScope();
var pendingChangeService = scope.ServiceProvider
.GetRequiredService<IPendingChangeService>();
var expiredCount = await pendingChangeService.ExpireOldChangesAsync(stoppingToken);
if (expiredCount > 0)
{
_logger.LogInformation("Expired {Count} PendingChanges", expiredCount);
}
else
{
_logger.LogDebug("No expired PendingChanges found");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred while expiring PendingChanges");
}
// Wait for the next interval
try
{
await Task.Delay(_interval, stoppingToken);
}
catch (TaskCanceledException)
{
// Expected when cancellation is requested
break;
}
}
_logger.LogInformation("PendingChange Expiration Background Service stopped");
}
}

View File

@@ -3,6 +3,7 @@ using ColaFlow.Modules.Mcp.Application.Resources;
using ColaFlow.Modules.Mcp.Application.Services;
using ColaFlow.Modules.Mcp.Contracts.Resources;
using ColaFlow.Modules.Mcp.Domain.Repositories;
using ColaFlow.Modules.Mcp.Infrastructure.BackgroundServices;
using ColaFlow.Modules.Mcp.Infrastructure.Middleware;
using ColaFlow.Modules.Mcp.Infrastructure.Persistence;
using ColaFlow.Modules.Mcp.Infrastructure.Persistence.Repositories;
@@ -41,6 +42,10 @@ public static class McpServiceExtensions
services.AddScoped<ColaFlow.Modules.Mcp.Domain.Services.DiffPreviewService>();
services.AddScoped<ColaFlow.Modules.Mcp.Domain.Services.TaskLockService>();
services.AddScoped<IMcpApiKeyService, McpApiKeyService>();
services.AddScoped<IPendingChangeService, PendingChangeService>();
// Register background services
services.AddHostedService<PendingChangeExpirationBackgroundService>();
// Register resource registry (Singleton - shared across all requests)
services.AddSingleton<IMcpResourceRegistry, McpResourceRegistry>();