74 KiB
ColaFlow M2 MCP Server Architecture Design
Version: 2.0 (Enhanced) Date: 2025-11-04 Milestone: M2 - MCP Server Integration (3-4月) Duration: 8 weeks Status: Ready for Implementation
Executive Summary
This document defines the complete technical architecture for ColaFlow M2 MCP Server, enabling AI tools (ChatGPT, Claude, Gemini) to safely interact with ColaFlow via the Model Context Protocol (MCP).
Key Design Decisions
| Decision | Technology | Rationale |
|---|---|---|
| Architecture Pattern | Modular Monolith + Clean Architecture | Builds on M1 foundation, easy to extract later |
| MCP Implementation | Custom .NET 9 Implementation | Native integration, no Node.js dependency |
| Communication | JSON-RPC 2.0 over HTTP/SSE | Standard MCP protocol, wide compatibility |
| Security Model | API Key + Diff Preview + Human Approval | Safety-first approach |
| Agent Management | Agent Registration + Heartbeat | Inspired by headless-pm |
| Task Locking | Optimistic Concurrency + Redis | Prevent concurrent AI modifications |
| Database | PostgreSQL JSONB + Existing DB | Reuse existing infrastructure |
Architecture Overview
┌──────────────────────────────────────────────────────────────┐
│ AI Clients Layer │
│ ChatGPT | Claude | Gemini | Custom AI Agents │
└────────────────────────┬─────────────────────────────────────┘
│ MCP Protocol (JSON-RPC)
┌────────────────────────┴─────────────────────────────────────┐
│ ColaFlow MCP Server (NEW Module) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ MCP Protocol Layer (JSON-RPC Handler) │ │
│ │ - resources/list, resources/read │ │
│ │ - tools/list, tools/call │ │
│ │ - Agent registration, heartbeat │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ ┌──────────────────────┴──────────────────────────────────┐ │
│ │ MCP Application Services │ │
│ │ - ResourceService (read operations) │ │
│ │ - ToolInvocationService (write with preview) │ │
│ │ - DiffPreviewService (generate, approve, reject) │ │
│ │ - AgentCoordinationService (register, heartbeat, lock) │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ ┌──────────────────────┴──────────────────────────────────┐ │
│ │ Security & Permission Layer │ │
│ │ - API Key Authentication │ │
│ │ - Field-level permission filtering │ │
│ │ - Rate limiting (Redis) │ │
│ └──────────────────────┬──────────────────────────────────┘ │
└─────────────────────────┼────────────────────────────────────┘
│
┌─────────────────────────┴────────────────────────────────────┐
│ Existing ColaFlow Modules (M1) │
│ - Issue Management (Projects, Issues, Kanban) │
│ - Identity Module (Users, Tenants, Auth) │
│ - Audit Log System │
└─────────────────────────┬────────────────────────────────────┘
│
┌─────────────────────────┴────────────────────────────────────┐
│ Data Layer │
│ PostgreSQL (Shared DB) + Redis (Caching, Rate Limit, Lock) │
└──────────────────────────────────────────────────────────────┘
1. Background & Requirements
1.1 M1 Foundation (Completed)
What's Already Built:
- ✅ Issue Management Module (Domain, Application, Infrastructure, API)
- ✅ Identity Module (User, Tenant, Multi-tenancy, JWT Authentication)
- ✅ Clean Architecture (.NET 9, PostgreSQL, EF Core)
- ✅ CQRS + DDD patterns (MediatR, Aggregates, Domain Events)
- ✅ Audit Log System (Technical design complete)
- ✅ Multi-tenant data isolation (TenantContext service)
- ✅ Performance optimization (5 indexes, < 5ms queries)
Current Tech Stack:
- Backend: .NET 9, ASP.NET Core, EF Core 9
- Database: PostgreSQL 16 + Redis 7
- Authentication: JWT Bearer
- Architecture: Modular Monolith + Clean Architecture
1.2 M2 Goals
Business Objectives:
- Enable AI tools to read ColaFlow data (projects, issues, sprints)
- Enable AI tools to write ColaFlow data with human approval
- Implement safety mechanisms (diff preview, rollback)
- Support multiple AI agents with permission control
- Provide audit trail for all AI operations
Technical Objectives:
- Implement MCP Server protocol in .NET 9
- Integrate with existing Issue Management module
- Design Agent registration and coordination system
- Implement diff preview and approval workflow
- Ensure multi-tenant isolation for AI operations
- Provide comprehensive audit logs
1.3 Reference: headless-pm
Key Patterns from headless-pm to Adopt:
# headless-pm/agent.py
class Agent:
"""AI Agent with registration and heartbeat"""
def __init__(self, name: str, capabilities: List[str]):
self.id = str(uuid.uuid4())
self.name = name
self.capabilities = capabilities
self.last_heartbeat = datetime.utcnow()
self.status = AgentStatus.ACTIVE
def heartbeat(self):
"""Update last seen timestamp"""
self.last_heartbeat = datetime.utcnow()
self.status = AgentStatus.ACTIVE
def is_alive(self, timeout_seconds: int = 300) -> bool:
"""Check if agent is still alive (5 min timeout)"""
return (datetime.utcnow() - self.last_heartbeat).total_seconds() < timeout_seconds
# headless-pm/task_lock.py
class TaskLock:
"""Prevent concurrent modifications by multiple agents"""
def __init__(self, task_id: str, agent_id: str):
self.task_id = task_id
self.agent_id = agent_id
self.acquired_at = datetime.utcnow()
self.expires_at = datetime.utcnow() + timedelta(minutes=15)
def is_valid(self) -> bool:
return datetime.utcnow() < self.expires_at
Adaptation for ColaFlow:
- Replace Python with C# + .NET 9
- Use EF Core instead of SQLModel
- Use Redis for distributed locks
- Add diff preview workflow (headless-pm doesn't have this)
- Add field-level permissions
2. MCP Protocol Design
2.1 MCP Protocol Overview
MCP (Model Context Protocol) is Anthropic's standard for AI-application communication.
Key Concepts:
- Resources - Read-only data exposures (e.g.,
project://123) - Tools - AI-invokable functions (e.g.,
create_issue) - Prompts - Reusable prompt templates
- Sampling - AI model invocation (future phase)
Transport Layer:
- JSON-RPC 2.0 over HTTP (REST)
- JSON-RPC 2.0 over SSE (Server-Sent Events for real-time)
- Future: stdio for local processes
2.2 Resource Definitions
2.2.1 Resource URI Scheme
colaflow://projects # List all projects
colaflow://projects/{projectId} # Get project by ID
colaflow://projects/{projectId}/issues # List project issues
colaflow://issues/{issueId} # Get issue by ID
colaflow://issues/search?query={text} # Search issues
colaflow://sprints # List sprints
colaflow://sprints/{sprintId} # Get sprint by ID
colaflow://reports/daily # Get daily report
colaflow://docs/drafts # List document drafts
2.2.2 Resource Response Format
{
"uri": "colaflow://projects/abc-123",
"name": "ColaFlow MVP",
"description": "Project details for ColaFlow MVP",
"mimeType": "application/json",
"content": {
"id": "abc-123",
"name": "ColaFlow MVP",
"description": "Build initial MVP version",
"status": "Active",
"owner": {
"id": "user-456",
"name": "John Doe",
"email": "john@example.com"
},
"issueCount": 45,
"completedIssueCount": 12,
"createdAt": "2025-11-01T00:00:00Z"
}
}
2.3 Tool Definitions
2.3.1 Tool: create_issue
{
"name": "create_issue",
"description": "Create a new issue in a project",
"inputSchema": {
"type": "object",
"properties": {
"projectId": {
"type": "string",
"description": "Project ID (UUID)",
"format": "uuid"
},
"title": {
"type": "string",
"description": "Issue title (required)",
"minLength": 1,
"maxLength": 200
},
"type": {
"type": "string",
"enum": ["Story", "Task", "Bug", "Epic"],
"description": "Issue type"
},
"priority": {
"type": "string",
"enum": ["Low", "Medium", "High", "Critical"],
"default": "Medium"
},
"description": {
"type": "string",
"description": "Detailed description"
},
"assigneeId": {
"type": "string",
"format": "uuid",
"description": "Assign to user (optional)"
}
},
"required": ["projectId", "title", "type"]
}
}
Tool Call Flow:
1. AI Client → MCP Server: tools/call { name: "create_issue", arguments: {...} }
2. MCP Server → DiffPreviewService: Generate diff preview
3. MCP Server → AI Client: { requiresApproval: true, previewId: "123", diffPreview: {...} }
4. Human → MCP Server: POST /api/mcp/diffs/{previewId}/approve
5. MCP Server → Issue Management: Execute CreateIssueCommand
6. MCP Server → AI Client: { success: true, issueId: "456" }
2.3.2 Tool: update_issue_status
{
"name": "update_issue_status",
"description": "Update issue status",
"inputSchema": {
"type": "object",
"properties": {
"issueId": {
"type": "string",
"format": "uuid"
},
"status": {
"type": "string",
"enum": ["Backlog", "Todo", "InProgress", "Done"]
},
"comment": {
"type": "string",
"description": "Optional reason for status change"
}
},
"required": ["issueId", "status"]
}
}
2.3.3 Tool: assign_issue
{
"name": "assign_issue",
"description": "Assign issue to a user",
"inputSchema": {
"type": "object",
"properties": {
"issueId": { "type": "string", "format": "uuid" },
"assigneeId": { "type": "string", "format": "uuid" },
"notifyAssignee": { "type": "boolean", "default": true }
},
"required": ["issueId", "assigneeId"]
}
}
2.3.4 Tool: log_decision
{
"name": "log_decision",
"description": "Log an architectural or product decision",
"inputSchema": {
"type": "object",
"properties": {
"projectId": { "type": "string", "format": "uuid" },
"title": { "type": "string" },
"decision": { "type": "string" },
"rationale": { "type": "string" },
"alternatives": {
"type": "array",
"items": { "type": "string" }
}
},
"required": ["projectId", "title", "decision"]
}
}
2.4 Prompt Templates
{
"prompts": [
{
"name": "daily_standup",
"description": "Generate daily standup report",
"arguments": [
{
"name": "date",
"description": "Report date (YYYY-MM-DD)",
"required": false
}
],
"template": "Generate a daily standup report for {{date}}. Include:\n1. Completed issues\n2. In-progress issues\n3. Blockers\n4. Upcoming priorities"
},
{
"name": "sprint_planning",
"description": "Generate sprint planning summary",
"template": "Analyze the backlog and generate sprint planning recommendations:\n1. Suggested issues for next sprint\n2. Estimated story points\n3. Team capacity analysis\n4. Risk assessment"
},
{
"name": "detect_risks",
"description": "Detect project risks",
"template": "Analyze the project and identify potential risks:\n1. Schedule risks\n2. Resource risks\n3. Technical risks\n4. Mitigation suggestions"
}
]
}
3. Module Architecture Design
3.1 Module Structure (Modular Monolith)
ColaFlow.Modules.Mcp/
├── ColaFlow.Modules.Mcp.Domain/
│ ├── Aggregates/
│ │ ├── McpAgents/
│ │ │ ├── McpAgent.cs # Agent aggregate root
│ │ │ ├── AgentHeartbeat.cs # Value object
│ │ │ └── AgentCapability.cs # Value object
│ │ ├── DiffPreviews/
│ │ │ ├── DiffPreview.cs # Diff preview aggregate
│ │ │ ├── DiffOperation.cs # Entity
│ │ │ └── RiskAssessment.cs # Value object
│ │ └── TaskLocks/
│ │ └── TaskLock.cs # Task lock aggregate
│ ├── Events/
│ │ ├── AgentRegisteredEvent.cs
│ │ ├── DiffPreviewCreatedEvent.cs
│ │ ├── DiffApprovedEvent.cs
│ │ └── TaskLockedEvent.cs
│ ├── ValueObjects/
│ │ ├── McpAgentId.cs
│ │ ├── ApiKey.cs
│ │ ├── ResourceUri.cs
│ │ └── ToolName.cs
│ ├── Enums/
│ │ ├── AgentStatus.cs
│ │ ├── DiffPreviewStatus.cs
│ │ └── RiskLevel.cs
│ └── Contracts/
│ ├── IMcpAgentRepository.cs
│ ├── IDiffPreviewRepository.cs
│ └── ITaskLockRepository.cs
│
├── ColaFlow.Modules.Mcp.Application/
│ ├── Commands/
│ │ ├── RegisterAgent/
│ │ │ ├── RegisterAgentCommand.cs
│ │ │ ├── RegisterAgentCommandHandler.cs
│ │ │ └── RegisterAgentCommandValidator.cs
│ │ ├── RecordHeartbeat/
│ │ ├── ApproveDiff/
│ │ ├── RejectDiff/
│ │ └── InvokeTool/
│ ├── Queries/
│ │ ├── ListResources/
│ │ ├── ReadResource/
│ │ ├── ListTools/
│ │ ├── GetDiffPreview/
│ │ └── ListPendingDiffs/
│ ├── Services/
│ │ ├── IResourceService.cs # Resource access
│ │ ├── IToolInvocationService.cs # Tool execution
│ │ ├── IDiffPreviewService.cs # Diff generation
│ │ ├── IAgentCoordinationService.cs # Agent management
│ │ └── ITaskLockService.cs # Concurrency control
│ └── DTOs/
│ ├── ResourceDto.cs
│ ├── ToolDto.cs
│ ├── DiffPreviewDto.cs
│ └── AgentDto.cs
│
├── ColaFlow.Modules.Mcp.Infrastructure/
│ ├── Persistence/
│ │ ├── McpDbContext.cs # NOT separate DB, use existing
│ │ ├── Configurations/
│ │ │ ├── McpAgentConfiguration.cs
│ │ │ ├── DiffPreviewConfiguration.cs
│ │ │ └── TaskLockConfiguration.cs
│ │ └── Repositories/
│ │ ├── McpAgentRepository.cs
│ │ ├── DiffPreviewRepository.cs
│ │ └── TaskLockRepository.cs
│ ├── Protocol/
│ │ ├── JsonRpcHandler.cs # JSON-RPC protocol
│ │ ├── JsonRpcRequest.cs
│ │ ├── JsonRpcResponse.cs
│ │ └── SseHandler.cs # SSE transport
│ ├── Services/
│ │ ├── ResourceService.cs
│ │ ├── ToolInvocationService.cs
│ │ ├── DiffPreviewService.cs
│ │ ├── AgentCoordinationService.cs
│ │ └── TaskLockService.cs
│ ├── Security/
│ │ ├── ApiKeyAuthenticationHandler.cs
│ │ ├── McpPermissionValidator.cs
│ │ └── FieldLevelFilter.cs
│ └── Caching/
│ └── McpRedisCacheService.cs
│
└── ColaFlow.Modules.Mcp.API/
├── Controllers/
│ ├── McpProtocolController.cs # JSON-RPC endpoint
│ ├── McpAgentsController.cs # Agent management
│ └── McpDiffPreviewsController.cs # Human approval UI
├── Middleware/
│ ├── McpAuthenticationMiddleware.cs
│ ├── McpAuditMiddleware.cs
│ └── McpRateLimitMiddleware.cs
└── Extensions/
└── McpModuleExtensions.cs # DI registration
3.2 Integration with M1 Modules
// MCP Module calls Issue Management Module via MediatR
public class ToolInvocationService : IToolInvocationService
{
private readonly IMediator _mediator;
private readonly IDiffPreviewService _diffPreviewService;
public async Task<ToolInvocationResult> InvokeToolAsync(
string toolName,
Dictionary<string, object> arguments,
Guid agentId,
TenantId tenantId)
{
if (toolName == "create_issue")
{
// 1. Generate diff preview
var diffPreview = await _diffPreviewService.GenerateDiffAsync(
toolName, arguments, agentId, tenantId);
// 2. Return preview to AI client (requires human approval)
return new ToolInvocationResult
{
RequiresApproval = true,
DiffPreviewId = diffPreview.Id,
DiffPreview = diffPreview
};
}
// Other tools...
}
public async Task<object> CommitDiffPreviewAsync(Guid previewId, Guid approvedBy)
{
var preview = await _diffPreviewRepository.GetByIdAsync(previewId);
if (preview.ToolName == "create_issue")
{
// Execute actual command in Issue Management module
var command = new CreateIssueCommand
{
ProjectId = preview.ParsedArguments["projectId"],
Title = preview.ParsedArguments["title"],
Type = preview.ParsedArguments["type"],
// ...
};
var result = await _mediator.Send(command);
// Mark diff as committed
preview.MarkAsCommitted(result.Id);
await _diffPreviewRepository.UpdateAsync(preview);
return result;
}
}
}
4. Domain Model Design
4.1 McpAgent Aggregate
namespace ColaFlow.Modules.Mcp.Domain.Aggregates.McpAgents;
/// <summary>
/// Represents an AI Agent registered to access ColaFlow via MCP
/// Inspired by headless-pm Agent model
/// </summary>
public sealed class McpAgent : AggregateRoot
{
private McpAgent() { } // EF Core
public McpAgentId Id { get; private set; }
public TenantId TenantId { get; private set; }
// Identity
public string AgentName { get; private set; }
public string AgentType { get; private set; } // "Claude", "ChatGPT", "Gemini", "Custom"
public string Version { get; private set; } // Agent version
// Authentication
public ApiKey ApiKey { get; private set; }
public DateTime ApiKeyExpiresAt { get; private set; }
public AgentStatus Status { get; private set; }
// Heartbeat (inspired by headless-pm)
public DateTime LastHeartbeat { get; private set; }
public TimeSpan HeartbeatTimeout { get; private set; } = TimeSpan.FromMinutes(5);
// Permissions
public McpPermissionLevel PermissionLevel { get; private set; }
private readonly List<string> _allowedResources = new();
public IReadOnlyCollection<string> AllowedResources => _allowedResources.AsReadOnly();
private readonly List<string> _allowedTools = new();
public IReadOnlyCollection<string> AllowedTools => _allowedTools.AsReadOnly();
// Capabilities (inspired by headless-pm)
private readonly List<string> _capabilities = new();
public IReadOnlyCollection<string> Capabilities => _capabilities.AsReadOnly();
// Statistics
public int RequestCount { get; private set; }
public DateTime CreatedAt { get; private set; }
public Guid CreatedBy { get; private set; }
/// <summary>
/// Factory method - Register new AI agent
/// </summary>
public static McpAgent Register(
TenantId tenantId,
string agentName,
string agentType,
string version,
ApiKey apiKey,
DateTime apiKeyExpiresAt,
McpPermissionLevel permissionLevel,
List<string> capabilities,
Guid createdBy)
{
// Validation
if (string.IsNullOrWhiteSpace(agentName))
throw new DomainException("Agent name cannot be empty");
if (apiKeyExpiresAt <= DateTime.UtcNow)
throw new DomainException("API key expiration must be in the future");
var agent = new McpAgent
{
Id = McpAgentId.Create(),
TenantId = tenantId,
AgentName = agentName,
AgentType = agentType,
Version = version,
ApiKey = apiKey,
ApiKeyExpiresAt = apiKeyExpiresAt,
Status = AgentStatus.Active,
LastHeartbeat = DateTime.UtcNow,
PermissionLevel = permissionLevel,
CreatedAt = DateTime.UtcNow,
CreatedBy = createdBy,
RequestCount = 0
};
agent._capabilities.AddRange(capabilities);
// Default permissions based on level
agent.InitializeDefaultPermissions();
// Raise domain event
agent.AddDomainEvent(new AgentRegisteredEvent(
agent.Id, agent.AgentName, agent.AgentType, tenantId));
return agent;
}
/// <summary>
/// Record heartbeat (inspired by headless-pm)
/// </summary>
public void RecordHeartbeat()
{
LastHeartbeat = DateTime.UtcNow;
if (Status == AgentStatus.Inactive)
{
Status = AgentStatus.Active;
AddDomainEvent(new AgentActivatedEvent(Id));
}
}
/// <summary>
/// Check if agent is alive (inspired by headless-pm)
/// </summary>
public bool IsAlive()
{
return (DateTime.UtcNow - LastHeartbeat) < HeartbeatTimeout;
}
/// <summary>
/// Mark as inactive if no heartbeat
/// </summary>
public void MarkAsInactiveIfTimeout()
{
if (!IsAlive() && Status == AgentStatus.Active)
{
Status = AgentStatus.Inactive;
AddDomainEvent(new AgentInactiveEvent(Id, LastHeartbeat));
}
}
/// <summary>
/// Record API request
/// </summary>
public void RecordRequest()
{
RequestCount++;
LastHeartbeat = DateTime.UtcNow;
}
/// <summary>
/// Update permissions
/// </summary>
public void UpdatePermissions(
McpPermissionLevel level,
List<string> resources,
List<string> tools)
{
PermissionLevel = level;
_allowedResources.Clear();
_allowedResources.AddRange(resources);
_allowedTools.Clear();
_allowedTools.AddRange(tools);
AddDomainEvent(new AgentPermissionsUpdatedEvent(Id, level));
}
/// <summary>
/// Revoke agent access
/// </summary>
public void Revoke()
{
Status = AgentStatus.Revoked;
AddDomainEvent(new AgentRevokedEvent(Id));
}
/// <summary>
/// Regenerate API key
/// </summary>
public void RegenerateApiKey(ApiKey newApiKey, DateTime expiresAt)
{
if (expiresAt <= DateTime.UtcNow)
throw new DomainException("API key expiration must be in the future");
ApiKey = newApiKey;
ApiKeyExpiresAt = expiresAt;
AddDomainEvent(new AgentApiKeyRegeneratedEvent(Id));
}
private void InitializeDefaultPermissions()
{
switch (PermissionLevel)
{
case McpPermissionLevel.ReadOnly:
_allowedResources.AddRange(new[] { "projects.*", "issues.*", "sprints.*" });
break;
case McpPermissionLevel.WriteWithPreview:
_allowedResources.AddRange(new[] { "projects.*", "issues.*", "sprints.*" });
_allowedTools.AddRange(new[] { "create_issue", "update_issue_status", "assign_issue" });
break;
case McpPermissionLevel.DirectWrite:
_allowedResources.Add("*");
_allowedTools.Add("*");
break;
}
}
}
public enum AgentStatus
{
Active = 1,
Inactive = 2,
Revoked = 3
}
public enum McpPermissionLevel
{
ReadOnly = 1,
WriteWithPreview = 2,
DirectWrite = 3
}
4.2 DiffPreview Aggregate
namespace ColaFlow.Modules.Mcp.Domain.Aggregates.DiffPreviews;
/// <summary>
/// Represents a diff preview for AI-initiated write operations
/// Safety mechanism: AI proposes changes → Human approves → System commits
/// </summary>
public sealed class DiffPreview : AggregateRoot
{
private DiffPreview() { } // EF Core
public Guid Id { get; private set; }
public TenantId TenantId { get; private set; }
public McpAgentId AgentId { get; private set; }
// Operation details
public string ToolName { get; private set; }
public string InputParametersJson { get; private set; }
// Diff details
public DiffOperation Operation { get; private set; }
public string EntityType { get; private set; }
public Guid? EntityId { get; private set; }
public string BeforeStateJson { get; private set; }
public string AfterStateJson { get; private set; }
public string DiffJson { get; private set; }
// Risk assessment
public RiskLevel RiskLevel { get; private set; }
private readonly List<string> _riskReasons = new();
public IReadOnlyCollection<string> RiskReasons => _riskReasons.AsReadOnly();
// Approval workflow
public DiffPreviewStatus Status { get; private set; }
public Guid? ApprovedBy { get; private set; }
public DateTime? ApprovedAt { get; private set; }
public Guid? RejectedBy { get; private set; }
public DateTime? RejectedAt { get; private set; }
public string RejectionReason { get; private set; }
// Rollback
public bool IsCommitted { get; private set; }
public Guid? CommittedEntityId { get; private set; }
public DateTime? CommittedAt { get; private set; }
public string RollbackToken { get; private set; }
// Timestamps
public DateTime CreatedAt { get; private set; }
public DateTime ExpiresAt { get; private set; }
/// <summary>
/// Factory method - Create diff preview
/// </summary>
public static DiffPreview Create(
TenantId tenantId,
McpAgentId agentId,
string toolName,
string inputParametersJson,
DiffOperation operation,
string entityType,
Guid? entityId,
string beforeStateJson,
string afterStateJson,
string diffJson,
RiskLevel riskLevel,
List<string> riskReasons)
{
var preview = new DiffPreview
{
Id = Guid.NewGuid(),
TenantId = tenantId,
AgentId = agentId,
ToolName = toolName,
InputParametersJson = inputParametersJson,
Operation = operation,
EntityType = entityType,
EntityId = entityId,
BeforeStateJson = beforeStateJson,
AfterStateJson = afterStateJson,
DiffJson = diffJson,
RiskLevel = riskLevel,
Status = DiffPreviewStatus.Pending,
IsCommitted = false,
CreatedAt = DateTime.UtcNow,
ExpiresAt = DateTime.UtcNow.AddHours(24)
};
preview._riskReasons.AddRange(riskReasons);
preview.AddDomainEvent(new DiffPreviewCreatedEvent(
preview.Id, preview.AgentId, preview.ToolName, preview.RiskLevel));
return preview;
}
/// <summary>
/// Approve diff preview
/// </summary>
public void Approve(Guid approvedBy)
{
if (Status != DiffPreviewStatus.Pending)
throw new DomainException($"Cannot approve diff with status {Status}");
if (IsExpired())
throw new DomainException("Diff preview has expired");
Status = DiffPreviewStatus.Approved;
ApprovedBy = approvedBy;
ApprovedAt = DateTime.UtcNow;
AddDomainEvent(new DiffApprovedEvent(Id, approvedBy));
}
/// <summary>
/// Reject diff preview
/// </summary>
public void Reject(Guid rejectedBy, string reason)
{
if (Status != DiffPreviewStatus.Pending)
throw new DomainException($"Cannot reject diff with status {Status}");
Status = DiffPreviewStatus.Rejected;
RejectedBy = rejectedBy;
RejectedAt = DateTime.UtcNow;
RejectionReason = reason;
AddDomainEvent(new DiffRejectedEvent(Id, rejectedBy, reason));
}
/// <summary>
/// Mark as committed after successful execution
/// </summary>
public void MarkAsCommitted(Guid entityId)
{
if (Status != DiffPreviewStatus.Approved)
throw new DomainException("Can only commit approved diffs");
IsCommitted = true;
CommittedEntityId = entityId;
CommittedAt = DateTime.UtcNow;
Status = DiffPreviewStatus.Committed;
AddDomainEvent(new DiffCommittedEvent(Id, entityId));
}
/// <summary>
/// Check if expired
/// </summary>
public bool IsExpired()
{
return DateTime.UtcNow > ExpiresAt;
}
/// <summary>
/// Mark as expired (background job)
/// </summary>
public void MarkAsExpired()
{
if (Status == DiffPreviewStatus.Pending)
{
Status = DiffPreviewStatus.Expired;
AddDomainEvent(new DiffExpiredEvent(Id));
}
}
}
public enum DiffOperation
{
Create = 1,
Update = 2,
Delete = 3
}
public enum RiskLevel
{
Low = 1,
Medium = 2,
High = 3,
Critical = 4
}
public enum DiffPreviewStatus
{
Pending = 1,
Approved = 2,
Rejected = 3,
Expired = 4,
Committed = 5
}
4.3 TaskLock Aggregate (Inspired by headless-pm)
namespace ColaFlow.Modules.Mcp.Domain.Aggregates.TaskLocks;
/// <summary>
/// Prevents concurrent modifications by multiple AI agents
/// Inspired by headless-pm task locking mechanism
/// </summary>
public sealed class TaskLock : AggregateRoot
{
private TaskLock() { } // EF Core
public Guid Id { get; private set; }
public TenantId TenantId { get; private set; }
public McpAgentId AgentId { get; private set; }
// Lock target
public string EntityType { get; private set; } // "Issue", "Project", "Sprint"
public Guid EntityId { get; private set; }
// Lock details
public DateTime AcquiredAt { get; private set; }
public DateTime ExpiresAt { get; private set; }
public TimeSpan LockDuration { get; private set; } = TimeSpan.FromMinutes(15);
public bool IsReleased { get; private set; }
public DateTime? ReleasedAt { get; private set; }
/// <summary>
/// Factory method - Acquire lock
/// </summary>
public static TaskLock Acquire(
TenantId tenantId,
McpAgentId agentId,
string entityType,
Guid entityId)
{
var lockEntity = new TaskLock
{
Id = Guid.NewGuid(),
TenantId = tenantId,
AgentId = agentId,
EntityType = entityType,
EntityId = entityId,
AcquiredAt = DateTime.UtcNow,
ExpiresAt = DateTime.UtcNow.AddMinutes(15),
IsReleased = false
};
lockEntity.AddDomainEvent(new TaskLockedEvent(
lockEntity.Id, lockEntity.AgentId, lockEntity.EntityType, lockEntity.EntityId));
return lockEntity;
}
/// <summary>
/// Check if lock is valid
/// </summary>
public bool IsValid()
{
return !IsReleased && DateTime.UtcNow < ExpiresAt;
}
/// <summary>
/// Release lock
/// </summary>
public void Release()
{
if (IsReleased)
throw new DomainException("Lock already released");
IsReleased = true;
ReleasedAt = DateTime.UtcNow;
AddDomainEvent(new TaskUnlockedEvent(Id, AgentId, EntityId));
}
/// <summary>
/// Extend lock duration
/// </summary>
public void Extend(TimeSpan additionalDuration)
{
if (!IsValid())
throw new DomainException("Cannot extend expired or released lock");
ExpiresAt = ExpiresAt.Add(additionalDuration);
AddDomainEvent(new TaskLockExtendedEvent(Id, ExpiresAt));
}
}
5. Application Services Design
5.1 Resource Service
namespace ColaFlow.Modules.Mcp.Application.Services;
public interface IResourceService
{
/// <summary>
/// List all available resources for the current AI Agent
/// </summary>
Task<List<ResourceDescriptor>> ListResourcesAsync(
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default);
/// <summary>
/// Read a specific resource
/// </summary>
Task<ResourceContent> ReadResourceAsync(
string resourceUri,
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default);
}
public class ResourceService : IResourceService
{
private readonly IMediator _mediator;
private readonly IMcpAgentRepository _agentRepository;
private readonly IFieldLevelFilter _fieldFilter;
private readonly ILogger<ResourceService> _logger;
public async Task<List<ResourceDescriptor>> ListResourcesAsync(
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default)
{
var agent = await _agentRepository.GetByIdAsync(agentId, cancellationToken);
if (agent == null || agent.Status != AgentStatus.Active)
throw new UnauthorizedException("Agent not found or inactive");
// Filter resources based on agent permissions
var allResources = GetAllResourceDescriptors();
return allResources
.Where(r => IsResourceAllowed(r.Uri, agent))
.ToList();
}
public async Task<ResourceContent> ReadResourceAsync(
string resourceUri,
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default)
{
var agent = await _agentRepository.GetByIdAsync(agentId, cancellationToken);
// Permission check
if (!IsResourceAllowed(resourceUri, agent))
throw new ForbiddenException($"Agent not allowed to access resource: {resourceUri}");
// Parse URI and fetch data
var (entityType, entityId) = ParseResourceUri(resourceUri);
object content = entityType switch
{
"projects" when entityId == null => await FetchProjectsAsync(tenantId, cancellationToken),
"projects" => await FetchProjectByIdAsync(entityId.Value, tenantId, cancellationToken),
"issues" when entityId == null => await FetchIssuesAsync(tenantId, cancellationToken),
"issues" => await FetchIssueByIdAsync(entityId.Value, tenantId, cancellationToken),
_ => throw new NotFoundException($"Resource not found: {resourceUri}")
};
// Apply field-level filtering
content = _fieldFilter.FilterSensitiveFields(content, agent.PermissionLevel);
return new ResourceContent
{
Uri = resourceUri,
Content = JsonSerializer.Serialize(content),
MimeType = "application/json"
};
}
private async Task<object> FetchProjectsAsync(TenantId tenantId, CancellationToken ct)
{
var query = new GetProjectsQuery(tenantId);
return await _mediator.Send(query, ct);
}
private async Task<object> FetchProjectByIdAsync(Guid projectId, TenantId tenantId, CancellationToken ct)
{
var query = new GetProjectByIdQuery(projectId, tenantId);
return await _mediator.Send(query, ct);
}
private bool IsResourceAllowed(string resourceUri, McpAgent agent)
{
// Check wildcard permissions
if (agent.AllowedResources.Contains("*"))
return true;
// Check pattern matching
foreach (var pattern in agent.AllowedResources)
{
if (MatchesPattern(resourceUri, pattern))
return true;
}
return false;
}
private List<ResourceDescriptor> GetAllResourceDescriptors()
{
return new List<ResourceDescriptor>
{
new("colaflow://projects", "All Projects", "List all projects", "application/json"),
new("colaflow://projects/{id}", "Project Details", "Get project by ID", "application/json"),
new("colaflow://issues", "All Issues", "List all issues", "application/json"),
new("colaflow://issues/{id}", "Issue Details", "Get issue by ID", "application/json"),
// ... more resources
};
}
}
5.2 Tool Invocation Service
namespace ColaFlow.Modules.Mcp.Application.Services;
public interface IToolInvocationService
{
/// <summary>
/// List all available tools for the current AI Agent
/// </summary>
Task<List<ToolDescriptor>> ListToolsAsync(
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default);
/// <summary>
/// Invoke a tool (generates diff preview for write operations)
/// </summary>
Task<ToolInvocationResult> InvokeToolAsync(
string toolName,
Dictionary<string, object> arguments,
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default);
}
public class ToolInvocationService : IToolInvocationService
{
private readonly IMediator _mediator;
private readonly IDiffPreviewService _diffPreviewService;
private readonly ITaskLockService _taskLockService;
private readonly IMcpAgentRepository _agentRepository;
public async Task<ToolInvocationResult> InvokeToolAsync(
string toolName,
Dictionary<string, object> arguments,
TenantId tenantId,
McpAgentId agentId,
CancellationToken cancellationToken = default)
{
var agent = await _agentRepository.GetByIdAsync(agentId, cancellationToken);
// Permission check
if (!IsToolAllowed(toolName, agent))
throw new ForbiddenException($"Agent not allowed to use tool: {toolName}");
// Check if write operation requires preview
if (IsWriteOperation(toolName))
{
// Try to acquire lock on target entity
if (TryGetEntityId(arguments, out var entityId))
{
var lockAcquired = await _taskLockService.TryAcquireLockAsync(
tenantId, agentId, "Issue", entityId, cancellationToken);
if (!lockAcquired)
return ToolInvocationResult.Error("Entity is locked by another agent");
}
// Generate diff preview
var diffPreview = await _diffPreviewService.GenerateDiffAsync(
toolName, arguments, agentId, tenantId, cancellationToken);
return new ToolInvocationResult
{
RequiresApproval = true,
DiffPreviewId = diffPreview.Id,
DiffPreview = diffPreview,
IsSuccess = true
};
}
// Read-only operations: execute directly
var result = await ExecuteReadOnlyToolAsync(toolName, arguments, tenantId, cancellationToken);
return new ToolInvocationResult
{
RequiresApproval = false,
Result = result,
IsSuccess = true
};
}
private bool IsToolAllowed(string toolName, McpAgent agent)
{
if (agent.AllowedTools.Contains("*"))
return true;
return agent.AllowedTools.Contains(toolName);
}
}
5.3 Diff Preview Service
namespace ColaFlow.Modules.Mcp.Application.Services;
public interface IDiffPreviewService
{
Task<DiffPreview> GenerateDiffAsync(
string toolName,
Dictionary<string, object> arguments,
McpAgentId agentId,
TenantId tenantId,
CancellationToken cancellationToken = default);
Task<object> ApproveAndCommitAsync(
Guid previewId,
Guid approvedBy,
TenantId tenantId,
CancellationToken cancellationToken = default);
Task RejectAsync(
Guid previewId,
Guid rejectedBy,
string reason,
TenantId tenantId,
CancellationToken cancellationToken = default);
}
public class DiffPreviewService : IDiffPreviewService
{
private readonly IMediator _mediator;
private readonly IDiffPreviewRepository _diffPreviewRepository;
private readonly IRiskCalculator _riskCalculator;
public async Task<DiffPreview> GenerateDiffAsync(
string toolName,
Dictionary<string, object> arguments,
McpAgentId agentId,
TenantId tenantId,
CancellationToken cancellationToken = default)
{
// 1. Determine operation type
var operation = toolName switch
{
"create_issue" => DiffOperation.Create,
"update_issue_status" => DiffOperation.Update,
"delete_issue" => DiffOperation.Delete,
_ => throw new NotSupportedException($"Tool not supported: {toolName}")
};
// 2. Load current state (if update/delete)
string beforeStateJson = null;
Guid? entityId = null;
if (operation != DiffOperation.Create)
{
entityId = Guid.Parse(arguments["issueId"].ToString());
var currentEntity = await LoadCurrentEntityAsync(entityId.Value, tenantId, cancellationToken);
beforeStateJson = JsonSerializer.Serialize(currentEntity);
}
// 3. Simulate operation (dry-run)
var afterState = await SimulateOperationAsync(toolName, arguments, tenantId, cancellationToken);
var afterStateJson = JsonSerializer.Serialize(afterState);
// 4. Generate JSON diff
var diffJson = GenerateJsonDiff(beforeStateJson, afterStateJson);
// 5. Calculate risk level
var (riskLevel, riskReasons) = _riskCalculator.CalculateRisk(
operation, "Issue", arguments, beforeStateJson, afterStateJson);
// 6. Create DiffPreview aggregate
var diffPreview = DiffPreview.Create(
tenantId,
agentId,
toolName,
JsonSerializer.Serialize(arguments),
operation,
"Issue",
entityId,
beforeStateJson,
afterStateJson,
diffJson,
riskLevel,
riskReasons);
// 7. Persist
await _diffPreviewRepository.AddAsync(diffPreview, cancellationToken);
return diffPreview;
}
public async Task<object> ApproveAndCommitAsync(
Guid previewId,
Guid approvedBy,
TenantId tenantId,
CancellationToken cancellationToken = default)
{
var preview = await _diffPreviewRepository.GetByIdAsync(previewId, cancellationToken);
if (preview == null)
throw new NotFoundException("Diff preview not found");
if (preview.TenantId != tenantId)
throw new ForbiddenException("Access denied");
// Approve in domain
preview.Approve(approvedBy);
await _diffPreviewRepository.UpdateAsync(preview, cancellationToken);
// Execute actual operation
var result = await ExecuteOperationAsync(preview, cancellationToken);
// Mark as committed
preview.MarkAsCommitted(result.EntityId);
await _diffPreviewRepository.UpdateAsync(preview, cancellationToken);
return result;
}
private async Task<object> ExecuteOperationAsync(
DiffPreview preview,
CancellationToken cancellationToken)
{
var arguments = JsonSerializer.Deserialize<Dictionary<string, object>>(
preview.InputParametersJson);
return preview.ToolName switch
{
"create_issue" => await ExecuteCreateIssueAsync(arguments, preview.TenantId, cancellationToken),
"update_issue_status" => await ExecuteUpdateIssueStatusAsync(arguments, preview.TenantId, cancellationToken),
_ => throw new NotSupportedException($"Tool not supported: {preview.ToolName}")
};
}
private async Task<object> ExecuteCreateIssueAsync(
Dictionary<string, object> arguments,
TenantId tenantId,
CancellationToken cancellationToken)
{
var command = new CreateIssueCommand
{
TenantId = tenantId,
ProjectId = Guid.Parse(arguments["projectId"].ToString()),
Title = arguments["title"].ToString(),
Type = Enum.Parse<IssueType>(arguments["type"].ToString()),
Priority = Enum.Parse<IssuePriority>(arguments["priority"].ToString()),
Description = arguments.ContainsKey("description") ? arguments["description"].ToString() : null,
AssigneeId = arguments.ContainsKey("assigneeId") ? Guid.Parse(arguments["assigneeId"].ToString()) : (Guid?)null
};
var result = await _mediator.Send(command, cancellationToken);
return new { EntityId = result.Id, Entity = result };
}
}
5.4 Agent Coordination Service (Inspired by headless-pm)
namespace ColaFlow.Modules.Mcp.Application.Services;
public interface IAgentCoordinationService
{
Task<McpAgent> RegisterAgentAsync(
TenantId tenantId,
string agentName,
string agentType,
string version,
List<string> capabilities,
Guid createdBy,
CancellationToken cancellationToken = default);
Task RecordHeartbeatAsync(
McpAgentId agentId,
CancellationToken cancellationToken = default);
Task<List<McpAgent>> GetActiveAgentsAsync(
TenantId tenantId,
CancellationToken cancellationToken = default);
Task MarkInactiveAgentsAsync(
CancellationToken cancellationToken = default);
}
public class AgentCoordinationService : IAgentCoordinationService
{
private readonly IMcpAgentRepository _agentRepository;
private readonly IApiKeyGenerator _apiKeyGenerator;
public async Task<McpAgent> RegisterAgentAsync(
TenantId tenantId,
string agentName,
string agentType,
string version,
List<string> capabilities,
Guid createdBy,
CancellationToken cancellationToken = default)
{
// Generate API key
var apiKey = _apiKeyGenerator.Generate();
var apiKeyExpiresAt = DateTime.UtcNow.AddDays(90);
// Create agent aggregate
var agent = McpAgent.Register(
tenantId,
agentName,
agentType,
version,
apiKey,
apiKeyExpiresAt,
McpPermissionLevel.WriteWithPreview,
capabilities,
createdBy);
// Persist
await _agentRepository.AddAsync(agent, cancellationToken);
return agent;
}
public async Task RecordHeartbeatAsync(
McpAgentId agentId,
CancellationToken cancellationToken = default)
{
var agent = await _agentRepository.GetByIdAsync(agentId, cancellationToken);
if (agent == null)
throw new NotFoundException("Agent not found");
// Record heartbeat (domain method)
agent.RecordHeartbeat();
await _agentRepository.UpdateAsync(agent, cancellationToken);
}
public async Task MarkInactiveAgentsAsync(
CancellationToken cancellationToken = default)
{
var agents = await _agentRepository.GetAllActiveAsync(cancellationToken);
foreach (var agent in agents)
{
agent.MarkAsInactiveIfTimeout();
await _agentRepository.UpdateAsync(agent, cancellationToken);
}
}
}
6. Security Architecture
6.1 API Key Authentication
namespace ColaFlow.Modules.Mcp.Infrastructure.Security;
public class ApiKeyAuthenticationHandler : AuthenticationHandler<ApiKeyAuthenticationOptions>
{
private readonly IMcpAgentRepository _agentRepository;
protected override async Task<AuthenticateResult> HandleAuthenticateAsync()
{
// 1. Extract API key from header
if (!Request.Headers.TryGetValue("X-MCP-API-Key", out var apiKeyHeaderValues))
return AuthenticateResult.Fail("Missing API Key");
var apiKeyString = apiKeyHeaderValues.FirstOrDefault();
if (string.IsNullOrWhiteSpace(apiKeyString))
return AuthenticateResult.Fail("Invalid API Key");
// 2. Hash and lookup in database
var hashedKey = ApiKey.Hash(apiKeyString);
var agent = await _agentRepository.GetByApiKeyHashAsync(hashedKey);
if (agent == null)
return AuthenticateResult.Fail("Invalid API Key");
// 3. Check agent status
if (agent.Status != AgentStatus.Active)
return AuthenticateResult.Fail("Agent inactive or revoked");
// 4. Check expiration
if (agent.ApiKeyExpiresAt < DateTime.UtcNow)
return AuthenticateResult.Fail("API Key expired");
// 5. Check if alive (heartbeat timeout)
if (!agent.IsAlive())
return AuthenticateResult.Fail("Agent heartbeat timeout");
// 6. Create claims principal
var claims = new[]
{
new Claim("agent_id", agent.Id.Value.ToString()),
new Claim("tenant_id", agent.TenantId.Value.ToString()),
new Claim("agent_type", agent.AgentType),
new Claim("permission_level", agent.PermissionLevel.ToString()),
new Claim(ClaimTypes.Role, "AIAgent")
};
var identity = new ClaimsIdentity(claims, Scheme.Name);
var principal = new ClaimsPrincipal(identity);
var ticket = new AuthenticationTicket(principal, Scheme.Name);
// 7. Record usage (async, fire-and-forget)
_ = Task.Run(() => agent.RecordRequest());
return AuthenticateResult.Success(ticket);
}
}
/// <summary>
/// ApiKey value object with hashing
/// </summary>
public sealed class ApiKey : ValueObject
{
public string HashedValue { get; private set; }
private ApiKey(string hashedValue)
{
HashedValue = hashedValue;
}
public static ApiKey Create(string plainTextKey)
{
var hashedValue = Hash(plainTextKey);
return new ApiKey(hashedValue);
}
public static string Hash(string plainTextKey)
{
return BCrypt.Net.BCrypt.HashPassword(plainTextKey);
}
public bool Verify(string plainTextKey)
{
return BCrypt.Net.BCrypt.Verify(plainTextKey, HashedValue);
}
protected override IEnumerable<object> GetAtomicValues()
{
yield return HashedValue;
}
}
/// <summary>
/// API Key generator
/// </summary>
public class ApiKeyGenerator : IApiKeyGenerator
{
public ApiKey Generate()
{
var randomBytes = new byte[32];
using var rng = RandomNumberGenerator.Create();
rng.GetBytes(randomBytes);
var plainTextKey = $"mcp_prod_{Convert.ToBase64String(randomBytes).Replace("/", "").Replace("+", "")[..32]}";
return ApiKey.Create(plainTextKey);
}
}
6.2 Field-Level Permission Filter
namespace ColaFlow.Modules.Mcp.Infrastructure.Security;
public interface IFieldLevelFilter
{
object FilterSensitiveFields(object entity, McpPermissionLevel permissionLevel);
}
public class FieldLevelFilter : IFieldLevelFilter
{
private static readonly HashSet<string> SensitiveFields = new()
{
"passwordHash",
"apiKeyHash",
"ssn",
"creditCard",
"bankAccount",
"salary"
};
public object FilterSensitiveFields(object entity, McpPermissionLevel permissionLevel)
{
// AIAgent role: Hide all sensitive fields
if (permissionLevel != McpPermissionLevel.DirectWrite)
{
var json = JsonSerializer.Serialize(entity);
var document = JsonDocument.Parse(json);
var filteredJson = RemoveSensitiveFields(document.RootElement);
return JsonSerializer.Deserialize<object>(filteredJson);
}
return entity;
}
private JsonElement RemoveSensitiveFields(JsonElement element)
{
if (element.ValueKind == JsonValueKind.Object)
{
var filteredObject = new Dictionary<string, JsonElement>();
foreach (var property in element.EnumerateObject())
{
// Skip sensitive fields
if (SensitiveFields.Contains(property.Name, StringComparer.OrdinalIgnoreCase))
continue;
// Recursively filter nested objects
filteredObject[property.Name] = RemoveSensitiveFields(property.Value);
}
return JsonSerializer.SerializeToElement(filteredObject);
}
return element;
}
}
6.3 Rate Limiting
namespace ColaFlow.Modules.Mcp.API.Middleware;
public class McpRateLimitMiddleware
{
private readonly RequestDelegate _next;
private readonly IDistributedCache _cache; // Redis
public async Task InvokeAsync(HttpContext context)
{
var agentId = context.User.FindFirst("agent_id")?.Value;
if (agentId != null)
{
var operation = ExtractOperation(context.Request.Path);
var rateLimitKey = $"ratelimit:agent:{agentId}:{operation}";
var currentCountStr = await _cache.GetStringAsync(rateLimitKey);
var currentCount = int.Parse(currentCountStr ?? "0");
var (limit, window) = GetRateLimits(operation);
if (currentCount >= limit)
{
context.Response.StatusCode = 429;
await context.Response.WriteAsJsonAsync(new
{
error = "Rate limit exceeded",
limit,
retryAfter = window.TotalSeconds
});
return;
}
// Increment counter
await _cache.SetStringAsync(
rateLimitKey,
(currentCount + 1).ToString(),
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = window
});
}
await _next(context);
}
private (int Limit, TimeSpan Window) GetRateLimits(string operation)
{
return operation switch
{
"resources/read" => (100, TimeSpan.FromMinutes(1)),
"tools/call" => (10, TimeSpan.FromMinutes(1)),
_ => (50, TimeSpan.FromMinutes(1))
};
}
}
7. Database Design
7.1 Database Schema (PostgreSQL)
-- Schema: mcp
CREATE SCHEMA IF NOT EXISTS mcp;
-- Table: mcp_agents
CREATE TABLE mcp.mcp_agents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
-- Identity
agent_name VARCHAR(200) NOT NULL,
agent_type VARCHAR(100) NOT NULL,
version VARCHAR(50),
-- Authentication
api_key_hash VARCHAR(512) NOT NULL,
api_key_expires_at TIMESTAMP NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'Active',
-- Heartbeat
last_heartbeat TIMESTAMP NOT NULL DEFAULT NOW(),
heartbeat_timeout_seconds INTEGER NOT NULL DEFAULT 300,
-- Permissions
permission_level VARCHAR(50) NOT NULL DEFAULT 'WriteWithPreview',
allowed_resources JSONB NOT NULL DEFAULT '[]',
allowed_tools JSONB NOT NULL DEFAULT '[]',
capabilities JSONB NOT NULL DEFAULT '[]',
-- Statistics
request_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_by UUID NOT NULL,
-- Constraints
CONSTRAINT fk_mcp_agents_tenant FOREIGN KEY (tenant_id)
REFERENCES identity.tenants(id) ON DELETE CASCADE,
CONSTRAINT fk_mcp_agents_created_by FOREIGN KEY (created_by)
REFERENCES identity.users(id)
);
-- Indexes
CREATE INDEX idx_mcp_agents_tenant ON mcp.mcp_agents(tenant_id, status);
CREATE INDEX idx_mcp_agents_api_key ON mcp.mcp_agents(api_key_hash)
WHERE status = 'Active';
CREATE INDEX idx_mcp_agents_heartbeat ON mcp.mcp_agents(last_heartbeat DESC)
WHERE status = 'Active';
-- Table: mcp_diff_previews
CREATE TABLE mcp.mcp_diff_previews (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
agent_id UUID NOT NULL,
-- Operation details
tool_name VARCHAR(200) NOT NULL,
input_parameters_json JSONB NOT NULL,
-- Diff details
operation VARCHAR(50) NOT NULL,
entity_type VARCHAR(100) NOT NULL,
entity_id UUID,
before_state_json JSONB,
after_state_json JSONB NOT NULL,
diff_json JSONB NOT NULL,
-- Risk assessment
risk_level VARCHAR(50) NOT NULL,
risk_reasons JSONB NOT NULL DEFAULT '[]',
-- Approval workflow
status VARCHAR(50) NOT NULL DEFAULT 'Pending',
approved_by UUID,
approved_at TIMESTAMP,
rejected_by UUID,
rejected_at TIMESTAMP,
rejection_reason TEXT,
-- Rollback
is_committed BOOLEAN NOT NULL DEFAULT FALSE,
committed_entity_id UUID,
committed_at TIMESTAMP,
rollback_token VARCHAR(500),
-- Timestamps
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP NOT NULL DEFAULT (NOW() + INTERVAL '24 hours'),
-- Constraints
CONSTRAINT fk_mcp_diff_previews_tenant FOREIGN KEY (tenant_id)
REFERENCES identity.tenants(id) ON DELETE CASCADE,
CONSTRAINT fk_mcp_diff_previews_agent FOREIGN KEY (agent_id)
REFERENCES mcp.mcp_agents(id) ON DELETE CASCADE,
CONSTRAINT fk_mcp_diff_previews_approved_by FOREIGN KEY (approved_by)
REFERENCES identity.users(id),
CONSTRAINT fk_mcp_diff_previews_rejected_by FOREIGN KEY (rejected_by)
REFERENCES identity.users(id)
);
-- Indexes
CREATE INDEX idx_mcp_diff_previews_tenant_status ON mcp.mcp_diff_previews(tenant_id, status, created_at DESC);
CREATE INDEX idx_mcp_diff_previews_agent ON mcp.mcp_diff_previews(agent_id, created_at DESC);
CREATE INDEX idx_mcp_diff_previews_expires ON mcp.mcp_diff_previews(expires_at)
WHERE status = 'Pending';
CREATE INDEX idx_mcp_diff_previews_entity ON mcp.mcp_diff_previews(entity_type, entity_id);
-- Table: mcp_task_locks
CREATE TABLE mcp.mcp_task_locks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
agent_id UUID NOT NULL,
-- Lock target
entity_type VARCHAR(100) NOT NULL,
entity_id UUID NOT NULL,
-- Lock details
acquired_at TIMESTAMP NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP NOT NULL DEFAULT (NOW() + INTERVAL '15 minutes'),
lock_duration_seconds INTEGER NOT NULL DEFAULT 900,
is_released BOOLEAN NOT NULL DEFAULT FALSE,
released_at TIMESTAMP,
-- Constraints
CONSTRAINT fk_mcp_task_locks_tenant FOREIGN KEY (tenant_id)
REFERENCES identity.tenants(id) ON DELETE CASCADE,
CONSTRAINT fk_mcp_task_locks_agent FOREIGN KEY (agent_id)
REFERENCES mcp.mcp_agents(id) ON DELETE CASCADE,
CONSTRAINT uk_mcp_task_locks_entity UNIQUE (entity_type, entity_id)
WHERE is_released = FALSE
);
-- Indexes
CREATE INDEX idx_mcp_task_locks_agent ON mcp.mcp_task_locks(agent_id);
CREATE INDEX idx_mcp_task_locks_entity ON mcp.mcp_task_locks(entity_type, entity_id);
CREATE INDEX idx_mcp_task_locks_expires ON mcp.mcp_task_locks(expires_at)
WHERE is_released = FALSE;
-- Table: mcp_audit_logs
CREATE TABLE mcp.mcp_audit_logs (
id BIGSERIAL PRIMARY KEY,
tenant_id UUID NOT NULL,
agent_id UUID NOT NULL,
-- Request details
operation_type VARCHAR(100) NOT NULL,
resource_uri VARCHAR(500),
tool_name VARCHAR(200),
input_parameters_json JSONB,
-- Response details
is_success BOOLEAN NOT NULL,
error_message TEXT,
http_status_code INTEGER,
-- Diff preview (if applicable)
diff_preview_id UUID,
diff_status VARCHAR(50),
-- Performance
duration_ms INTEGER NOT NULL,
-- Context
client_ip_address VARCHAR(50),
user_agent TEXT,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
-- Constraints
CONSTRAINT fk_mcp_audit_logs_tenant FOREIGN KEY (tenant_id)
REFERENCES identity.tenants(id) ON DELETE CASCADE,
CONSTRAINT fk_mcp_audit_logs_agent FOREIGN KEY (agent_id)
REFERENCES mcp.mcp_agents(id) ON DELETE CASCADE,
CONSTRAINT fk_mcp_audit_logs_diff_preview FOREIGN KEY (diff_preview_id)
REFERENCES mcp.mcp_diff_previews(id)
);
-- Indexes (optimized for time-series queries)
CREATE INDEX idx_mcp_audit_logs_tenant_timestamp ON mcp.mcp_audit_logs(tenant_id, timestamp DESC);
CREATE INDEX idx_mcp_audit_logs_agent_timestamp ON mcp.mcp_audit_logs(agent_id, timestamp DESC);
CREATE INDEX idx_mcp_audit_logs_operation_timestamp ON mcp.mcp_audit_logs(operation_type, timestamp DESC);
CREATE INDEX idx_mcp_audit_logs_diff_preview ON mcp.mcp_audit_logs(diff_preview_id)
WHERE diff_preview_id IS NOT NULL;
-- Automatic cleanup functions
CREATE OR REPLACE FUNCTION mcp.cleanup_expired_diff_previews()
RETURNS void AS $$
BEGIN
UPDATE mcp.mcp_diff_previews
SET status = 'Expired'
WHERE status = 'Pending'
AND expires_at < NOW();
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION mcp.cleanup_expired_task_locks()
RETURNS void AS $$
BEGIN
UPDATE mcp.mcp_task_locks
SET is_released = TRUE,
released_at = NOW()
WHERE is_released = FALSE
AND expires_at < NOW();
END;
$$ LANGUAGE plpgsql;
8. API Design
8.1 MCP Protocol Endpoints
POST /api/v1/mcp/jsonrpc
- JSON-RPC 2.0 endpoint for MCP protocol
- Methods: initialize, resources/list, resources/read, tools/list, tools/call
GET /api/v1/mcp/sse
- Server-Sent Events endpoint for real-time updates
8.2 Agent Management Endpoints
POST /api/v1/mcp/agents/register
- Register new AI agent
- Returns: Agent ID + API key
POST /api/v1/mcp/agents/{agentId}/heartbeat
- Record agent heartbeat
GET /api/v1/mcp/agents
- List all agents for current tenant
GET /api/v1/mcp/agents/{agentId}
- Get agent details
PUT /api/v1/mcp/agents/{agentId}
- Update agent permissions
POST /api/v1/mcp/agents/{agentId}/revoke
- Revoke agent access
POST /api/v1/mcp/agents/{agentId}/regenerate-key
- Regenerate API key
8.3 Diff Preview Endpoints
GET /api/v1/mcp/diffs
- List pending diff previews for current tenant
GET /api/v1/mcp/diffs/{previewId}
- Get diff preview details
POST /api/v1/mcp/diffs/{previewId}/approve
- Approve and commit diff preview
POST /api/v1/mcp/diffs/{previewId}/reject
- Reject diff preview
GET /api/v1/mcp/diffs/history
- Get diff preview history
9. Implementation Roadmap
Phase 1: Foundation (Weeks 1-2)
Goal: Basic MCP Server infrastructure
Tasks:
- Create MCP module structure (Domain, Application, Infrastructure, API)
- Implement McpAgent aggregate + repository
- Implement DiffPreview aggregate + repository
- Implement TaskLock aggregate + repository
- Create database migrations
- Implement API key authentication
- Implement basic audit logging
Deliverables:
- ✅ MCP.Domain module complete
- ✅ MCP.Infrastructure persistence layer
- ✅ API key authentication working
- ✅ Can register AI agents
Acceptance Criteria:
- Can register an AI agent with API key
- Can authenticate using API key
- All requests are logged to mcp_audit_logs
Phase 2: Resources Implementation (Weeks 3-4)
Goal: Expose read-only resources to AI clients
Tasks:
- Implement ResourceService
- Implement JSON-RPC protocol handler
- Implement field-level permission filtering
- Implement rate limiting
- Create MCP protocol controller
- Add resource URI routing
Deliverables:
- ✅ ResourceService complete
- ✅ AI clients can list resources
- ✅ AI clients can read project/issue data
- ✅ Sensitive fields are filtered
Acceptance Criteria:
- AI client can list available resources
- AI client can read project data
- AI client can read issue data
- Sensitive fields are filtered out
- Rate limiting works
Phase 3: Tools & Diff Preview (Weeks 5-6)
Goal: Implement write operations with diff preview
Tasks:
- Implement DiffPreviewService
- Implement ToolInvocationService
- Implement diff generation algorithm
- Implement risk calculation
- Create diff approval endpoints
- Integrate with Issue Management module
Deliverables:
- ✅ DiffPreviewService complete
- ✅ AI clients can call tools
- ✅ Diff preview generation works
- ✅ Human can approve/reject diffs
Acceptance Criteria:
- AI client can list available tools
- AI client can call create_issue (generates diff preview)
- Human can view diff preview in Admin UI
- Human can approve diff (commits to database)
- Human can reject diff (discards preview)
Phase 4: Agent Coordination (Weeks 7-8)
Goal: Implement agent management and task locking
Tasks:
- Implement AgentCoordinationService
- Implement TaskLockService
- Implement heartbeat monitoring
- Implement background jobs (cleanup expired diffs/locks)
- Implement concurrency control
- Add monitoring and metrics
Deliverables:
- ✅ AgentCoordinationService complete
- ✅ Task locking works
- ✅ Heartbeat monitoring works
- ✅ Background cleanup jobs running
Acceptance Criteria:
- Multiple agents can work simultaneously
- Task locking prevents concurrent modifications
- Inactive agents are detected
- Expired diffs are cleaned up
Total Timeline: 8 weeks (~2 months)
Milestones:
- Week 2: Basic MCP Server running
- Week 4: AI clients can read resources
- Week 6: AI clients can create issues with approval
- Week 8: Production-ready with all features
10. Risk Mitigation
10.1 Technical Risks
| Risk | Impact | Probability | Mitigation |
|---|---|---|---|
| MCP Protocol Changes | High | Medium | Version negotiation, abstract protocol layer |
| Diff Accuracy | High | Medium | Comprehensive unit tests, visual diff viewer |
| Performance at Scale | Medium | Low | Async audit logs, Redis caching, connection pooling |
| Security Vulnerabilities | Critical | Medium | BCrypt hashing, rate limiting, field-level filtering, security audits |
| Concurrent Modifications | Medium | Medium | Redis-based distributed locks, optimistic concurrency |
10.2 Integration Risks
| Risk | Impact | Mitigation |
|---|---|---|
| Issue Management Breaking Changes | High | Use MediatR for loose coupling, integration tests |
| Multi-tenant Isolation Failure | Critical | Reuse TenantContext service, add validation |
| Audit Log Overhead | Medium | Async fire-and-forget pattern, JSONB compression |
11. Success Metrics
M2 Completion Criteria:
- ✅ AI agents can register and authenticate
- ✅ AI agents can read ColaFlow data (projects, issues)
- ✅ AI agents can create issues with diff preview
- ✅ Human approval workflow works
- ✅ Multi-tenant isolation maintained
- ✅ Complete audit trail for AI operations
- ✅ Rate limiting prevents abuse
- ✅ Task locking prevents conflicts
- ✅ All tests passing (unit + integration)
- ✅ Documentation complete
Performance Metrics:
- API response time < 100ms (P95)
- Diff generation < 500ms
- Rate limiting: 100 read/min, 10 write/min
- Heartbeat timeout: 5 minutes
- Lock timeout: 15 minutes
12. Testing Strategy
12.1 Unit Tests
// Domain Tests
[Fact]
public void McpAgent_Register_ShouldCreateActiveAgent()
{
var agent = McpAgent.Register(
TenantId.Create(Guid.NewGuid()),
"Claude AI",
"Claude",
"3.5",
ApiKey.Create("test-key"),
DateTime.UtcNow.AddDays(90),
McpPermissionLevel.WriteWithPreview,
new List<string> { "code_generation", "task_management" },
Guid.NewGuid());
agent.Should().NotBeNull();
agent.Status.Should().Be(AgentStatus.Active);
agent.IsAlive().Should().BeTrue();
}
[Fact]
public void McpAgent_MarkAsInactiveIfTimeout_ShouldMarkInactive()
{
var agent = CreateTestAgent();
// Simulate timeout by setting last heartbeat to 10 minutes ago
var lastHeartbeatField = typeof(McpAgent)
.GetField("LastHeartbeat", BindingFlags.NonPublic | BindingFlags.Instance);
lastHeartbeatField.SetValue(agent, DateTime.UtcNow.AddMinutes(-10));
agent.MarkAsInactiveIfTimeout();
agent.Status.Should().Be(AgentStatus.Inactive);
}
12.2 Integration Tests
// API Integration Tests
[Fact]
public async Task RegisterAgent_ShouldReturnApiKey()
{
var response = await _client.PostAsJsonAsync("/api/v1/mcp/agents/register", new
{
agentName = "Test Agent",
agentType = "Claude",
version = "3.5",
capabilities = new[] { "task_management" }
});
response.StatusCode.Should().Be(HttpStatusCode.Created);
var result = await response.Content.ReadFromJsonAsync<RegisterAgentResponse>();
result.AgentId.Should().NotBeEmpty();
result.ApiKey.Should().NotBeNullOrEmpty();
}
[Fact]
public async Task CreateIssue_WithValidApiKey_ShouldGenerateDiffPreview()
{
var apiKey = await RegisterTestAgent();
_client.DefaultRequestHeaders.Add("X-MCP-API-Key", apiKey);
var response = await _client.PostAsJsonAsync("/api/v1/mcp/jsonrpc", new
{
jsonrpc = "2.0",
id = 1,
method = "tools/call",
@params = new
{
name = "create_issue",
arguments = new
{
projectId = _testProjectId,
title = "Test Issue from AI",
type = "Task",
priority = "Medium"
}
}
});
response.StatusCode.Should().Be(HttpStatusCode.OK);
var result = await response.Content.ReadFromJsonAsync<JsonRpcResponse>();
result.Result.RequiresApproval.Should().BeTrue();
result.Result.DiffPreviewId.Should().NotBeEmpty();
}
13. Documentation Deliverables
- Architecture Document (this document)
- API Reference (OpenAPI/Swagger)
- MCP Protocol Guide (for AI client developers)
- Agent Registration Guide (how to register AI agents)
- Security Best Practices (API key management, permissions)
- Troubleshooting Guide (common issues and solutions)
14. Appendix
A. MCP Protocol Reference
JSON-RPC 2.0 Request Format:
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "create_issue",
"arguments": { ... }
}
}
JSON-RPC 2.0 Response Format:
{
"jsonrpc": "2.0",
"id": 1,
"result": { ... }
}
B. Configuration Example
{
"Mcp": {
"ApiKeyExpirationDays": 90,
"DiffPreviewExpirationHours": 24,
"HeartbeatTimeoutMinutes": 5,
"TaskLockDurationMinutes": 15,
"RateLimit": {
"ResourcesReadPerMinute": 100,
"ToolsCallPerMinute": 10
},
"DefaultPermissions": {
"Level": "WriteWithPreview",
"AllowedResources": ["projects.*", "issues.*", "sprints.*"],
"AllowedTools": ["create_issue", "update_issue_status", "assign_issue"]
}
}
}
Summary
This architecture design provides a comprehensive, secure, and scalable MCP Server for ColaFlow M2 that:
- Builds on M1 foundation - Integrates with existing Issue Management, Identity, and Audit modules
- Implements MCP protocol - Custom .NET 9 implementation, no Node.js dependency
- Ensures safety - Diff preview and human approval for all AI writes
- Provides security - API key authentication, field-level filtering, rate limiting
- Enables coordination - Agent registration, heartbeat monitoring, task locking (inspired by headless-pm)
- Maintains quality - Clean Architecture, CQRS, DDD patterns, comprehensive testing
Key Design Decisions:
- Modular Monolith (builds on M1 architecture)
- Custom MCP protocol implementation in C#
- BCrypt API key authentication
- Diff preview workflow (safety-first)
- PostgreSQL JSONB for flexible diff storage
- Redis for distributed locks and rate limiting
- Inspired by headless-pm agent coordination patterns
Next Steps:
- Review and approve this architecture document
- Begin Phase 1 implementation (Foundation)
- Set up CI/CD pipeline for MCP module
- Create integration tests for MCP protocol
Document Status: Ready for Implementation Reviewers: Product Manager, Backend Team Lead, Security Team Approval Required: Yes
Revision History:
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2025-11-04 | System Architect | Initial architecture design |
| 2.0 | 2025-11-04 | System Architect | Enhanced with headless-pm patterns, complete implementation details |