# ColaFlow Microservices Architecture Design **Version:** 1.0 **Date:** 2025-11-02 **Status:** Production-Ready Design **Author:** Architecture Team **User Decision:** Adopt Microservices Architecture (with full awareness of costs and risks) --- ## Executive Summary This document presents a **production-grade microservices architecture** for ColaFlow, as explicitly requested by the user. While the architecture team has previously recommended a Modular Monolith approach (see `Modular-Monolith-Architecture.md`), this document respects the user's strategic decision to adopt microservices from the start. ### Key Architectural Decisions | Decision | Technology | Rationale | |----------|-----------|-----------| | **Service Communication (Sync)** | gRPC | High performance, strong contracts, native .NET support | | **Service Communication (Async)** | RabbitMQ + MassTransit | Reliable messaging, event-driven architecture | | **API Gateway** | YARP (.NET 9) | Native .NET, high performance, reverse proxy | | **Service Discovery** | Consul / Kubernetes DNS | Production-ready, automatic service registration | | **Distributed Tracing** | OpenTelemetry + Jaeger | Vendor-neutral, comprehensive observability | | **Distributed Transactions** | Saga Pattern (MassTransit) | Orchestration-based, reliable compensation | | **Configuration Management** | Consul / Azure App Config | Centralized, dynamic configuration | | **Container Orchestration** | Kubernetes + Helm | Industry standard, mature ecosystem | | **Message Format** | Protocol Buffers (gRPC) + JSON (REST) | Type-safe, efficient serialization | | **Database per Service** | PostgreSQL (per service) | Data isolation, independent scaling | ### Cost and Risk Acknowledgment **Development Timeline Impact:** - Modular Monolith: 8-10 weeks to M1 - **Microservices: 16-20 weeks to M1** (+8-12 weeks) **Team Skill Requirements:** - Distributed systems expertise required - DevOps maturity critical - Kubernetes operational knowledge - Distributed transaction patterns (Saga) **Operational Complexity:** - 6+ microservices to manage - 6+ databases to maintain - API Gateway, Service Mesh, Message Queue - Distributed tracing and monitoring infrastructure **Infrastructure Cost Increase:** - Modular Monolith: ~$500/month - **Microservices: ~$3,000-5,000/month** (6-10x increase) --- ## 1. Microservices Architecture Overview ### 1.1 System Architecture Diagram ```mermaid graph TB subgraph "Client Layer" WebUI[Web Browser
Next.js 15] MobileApp[Mobile App
Future] AITools[AI Tools
ChatGPT/Claude] end subgraph "API Gateway Layer" YARP[YARP API Gateway
.NET 9] end subgraph "Service Layer" ProjectSvc[Project Service
Projects/Epics/Stories/Tasks] WorkflowSvc[Workflow Service
Workflow Engine] UserSvc[User Service
Auth & Users] NotifSvc[Notification Service
SignalR/Email] AuditSvc[Audit Service
Event Store] AISvc[AI Service
MCP Server] end subgraph "Infrastructure Layer" RabbitMQ[RabbitMQ
Message Bus] Redis[Redis
Cache/Session] Consul[Consul
Service Discovery] Jaeger[Jaeger
Distributed Tracing] end subgraph "Data Layer" DB1[(PostgreSQL 1
Projects)] DB2[(PostgreSQL 2
Workflows)] DB3[(PostgreSQL 3
Users)] DB4[(PostgreSQL 4
Notifications)] DB5[(PostgreSQL 5
Audit)] DB6[(PostgreSQL 6
AI)] end WebUI --> YARP MobileApp --> YARP AITools --> YARP YARP --> ProjectSvc YARP --> WorkflowSvc YARP --> UserSvc YARP --> NotifSvc YARP --> AuditSvc YARP --> AISvc ProjectSvc --> DB1 WorkflowSvc --> DB2 UserSvc --> DB3 NotifSvc --> DB4 AuditSvc --> DB5 AISvc --> DB6 ProjectSvc -.gRPC.-> WorkflowSvc ProjectSvc -.gRPC.-> UserSvc WorkflowSvc -.gRPC.-> ProjectSvc ProjectSvc --> RabbitMQ WorkflowSvc --> RabbitMQ NotifSvc --> RabbitMQ AuditSvc --> RabbitMQ ProjectSvc --> Redis UserSvc --> Redis ProjectSvc --> Consul WorkflowSvc --> Consul UserSvc --> Consul NotifSvc --> Consul AuditSvc --> Consul AISvc --> Consul ProjectSvc --> Jaeger WorkflowSvc --> Jaeger UserSvc --> Jaeger ``` ### 1.2 Service Catalog | Service | Port | Responsibility | Database | Key APIs | |---------|------|---------------|----------|----------| | **Project Service** | 5001 | Project/Epic/Story/Task management | PostgreSQL 1 | `/api/projects/*`, gRPC | | **Workflow Service** | 5002 | Workflow engine, state transitions | PostgreSQL 2 | `/api/workflows/*`, gRPC | | **User Service** | 5003 | Authentication, authorization, users | PostgreSQL 3 | `/api/users/*`, `/api/auth/*` | | **Notification Service** | 5004 | SignalR, email, push notifications | PostgreSQL 4 | `/api/notifications/*`, SignalR | | **Audit Service** | 5005 | Event store, audit logs, rollback | PostgreSQL 5 | `/api/audit/*` | | **AI Service** | 5006 | MCP Server, AI task generation | PostgreSQL 6 | `/api/ai/*`, MCP Resources | | **API Gateway** | 8080 | Routing, auth, rate limiting | - | All external routes | --- ## 2. Service Design - Bounded Contexts ### 2.1 Project Service (Core Domain) **Bounded Context:** Project Management **Domain Model:** ```csharp // Project Aggregate Root public class Project : AggregateRoot { public ProjectId Id { get; private set; } public string Name { get; private set; } public ProjectKey Key { get; private set; } private readonly List _epics = new(); public IReadOnlyCollection Epics => _epics.AsReadOnly(); // Business methods public Epic CreateEpic(string name, string description); public void UpdateDetails(string name, string description); } // Epic Entity public class Epic : Entity { public EpicId Id { get; private set; } public string Name { get; private set; } private readonly List _stories = new(); public IReadOnlyCollection Stories => _stories.AsReadOnly(); public Story CreateStory(string title, string description); } ``` **API Endpoints (REST):** ``` GET /api/projects # List projects POST /api/projects # Create project GET /api/projects/{id} # Get project PUT /api/projects/{id} # Update project DELETE /api/projects/{id} # Delete project GET /api/projects/{id}/epics # List epics POST /api/projects/{id}/epics # Create epic GET /api/epics/{id} # Get epic PUT /api/epics/{id} # Update epic GET /api/epics/{id}/stories # List stories POST /api/epics/{id}/stories # Create story GET /api/stories/{id} # Get story PUT /api/stories/{id} # Update story GET /api/stories/{id}/tasks # List tasks POST /api/stories/{id}/tasks # Create task GET /api/tasks/{id} # Get task PUT /api/tasks/{id} # Update task PATCH /api/tasks/{id}/status # Update task status ``` **gRPC Services:** ```protobuf // protos/project.proto syntax = "proto3"; package colaflow.project; service ProjectService { rpc GetProject (GetProjectRequest) returns (ProjectResponse); rpc GetProjectByKey (GetProjectByKeyRequest) returns (ProjectResponse); rpc GetTasksByAssignee (GetTasksByAssigneeRequest) returns (TaskListResponse); rpc ValidateProjectExists (ValidateProjectRequest) returns (ValidationResponse); } message GetProjectRequest { string project_id = 1; } message ProjectResponse { string id = 1; string name = 2; string key = 3; string status = 4; string owner_id = 5; } message GetTasksByAssigneeRequest { string assignee_id = 1; int32 page = 2; int32 page_size = 3; } message TaskListResponse { repeated TaskDto tasks = 1; int32 total_count = 2; } message TaskDto { string id = 1; string title = 2; string status = 3; string priority = 4; string project_id = 5; } ``` **Published Events:** ```csharp public record ProjectCreatedEvent(Guid ProjectId, string ProjectName, Guid OwnerId); public record TaskStatusChangedEvent(Guid TaskId, string OldStatus, string NewStatus, Guid ChangedBy); public record TaskAssignedEvent(Guid TaskId, Guid AssigneeId, Guid AssignedBy); public record EpicCreatedEvent(Guid EpicId, string EpicName, Guid ProjectId); ``` **Database Schema:** ```sql -- Projects Table CREATE TABLE projects ( id UUID PRIMARY KEY, name VARCHAR(200) NOT NULL, key VARCHAR(10) NOT NULL UNIQUE, description TEXT, status VARCHAR(50) NOT NULL, owner_id UUID NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP ); -- Epics Table CREATE TABLE epics ( id UUID PRIMARY KEY, project_id UUID NOT NULL REFERENCES projects(id), name VARCHAR(200) NOT NULL, description TEXT, status VARCHAR(50) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- Stories Table CREATE TABLE stories ( id UUID PRIMARY KEY, epic_id UUID NOT NULL REFERENCES epics(id), title VARCHAR(200) NOT NULL, description TEXT, status VARCHAR(50) NOT NULL, priority VARCHAR(50) NOT NULL, assignee_id UUID, estimated_hours DECIMAL(10,2), created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- Tasks Table CREATE TABLE tasks ( id UUID PRIMARY KEY, story_id UUID NOT NULL REFERENCES stories(id), title VARCHAR(200) NOT NULL, description TEXT, status VARCHAR(50) NOT NULL, priority VARCHAR(50) NOT NULL, assignee_id UUID, estimated_hours DECIMAL(10,2), actual_hours DECIMAL(10,2), custom_fields JSONB, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- Indexes CREATE INDEX idx_projects_key ON projects(key); CREATE INDEX idx_epics_project_id ON epics(project_id); CREATE INDEX idx_stories_epic_id ON stories(epic_id); CREATE INDEX idx_stories_assignee_id ON stories(assignee_id); CREATE INDEX idx_tasks_story_id ON tasks(story_id); CREATE INDEX idx_tasks_assignee_id ON tasks(assignee_id); CREATE INDEX idx_tasks_status ON tasks(status); ``` --- ### 2.2 Workflow Service **Bounded Context:** Workflow Engine **Domain Model:** ```csharp public class Workflow : AggregateRoot { public WorkflowId Id { get; private set; } public string Name { get; private set; } public Guid ProjectId { get; private set; } public bool IsDefault { get; private set; } private readonly List _states = new(); public IReadOnlyCollection States => _states.AsReadOnly(); public void AddState(string stateName); public void AddTransition(string fromState, string toState); public bool CanTransition(string currentState, string targetState); } public class WorkflowState : Entity { public string Name { get; private set; } public StateCategory Category { get; private set; } // ToDo, InProgress, Done private readonly List _transitions = new(); public IReadOnlyCollection Transitions => _transitions.AsReadOnly(); } ``` **API Endpoints:** ``` GET /api/workflows # List all workflows POST /api/workflows # Create workflow GET /api/workflows/{id} # Get workflow PUT /api/workflows/{id} # Update workflow DELETE /api/workflows/{id} # Delete workflow GET /api/workflows/project/{projectId} # Get workflows for project POST /api/workflows/{id}/validate # Validate transition ``` **gRPC Services:** ```protobuf // protos/workflow.proto syntax = "proto3"; package colaflow.workflow; service WorkflowService { rpc GetWorkflowByProject (GetWorkflowByProjectRequest) returns (WorkflowResponse); rpc ValidateTransition (ValidateTransitionRequest) returns (ValidationResponse); rpc GetAvailableTransitions (GetAvailableTransitionsRequest) returns (TransitionsResponse); } message GetWorkflowByProjectRequest { string project_id = 1; } message WorkflowResponse { string id = 1; string name = 2; repeated WorkflowState states = 3; } message WorkflowState { string name = 1; string category = 2; repeated string allowed_transitions = 3; } message ValidateTransitionRequest { string workflow_id = 1; string current_state = 2; string target_state = 3; } message ValidationResponse { bool is_valid = 1; string message = 2; } ``` **Published Events:** ```csharp public record WorkflowCreatedEvent(Guid WorkflowId, Guid ProjectId); public record StateTransitionValidatedEvent(Guid WorkflowId, string FromState, string ToState); ``` --- ### 2.3 User Service **Bounded Context:** User Management & Authentication **Domain Model:** ```csharp public class User : AggregateRoot { public UserId Id { get; private set; } public Email Email { get; private set; } public string FirstName { get; private set; } public string LastName { get; private set; } public string PasswordHash { get; private set; } public UserRole Role { get; private set; } public static User Create(string email, string firstName, string lastName, string password); public void UpdateProfile(string firstName, string lastName); public void ChangePassword(string currentPassword, string newPassword); } public class Team : AggregateRoot { public TeamId Id { get; private set; } public string Name { get; private set; } private readonly List _members = new(); public IReadOnlyCollection Members => _members.AsReadOnly(); public void AddMember(UserId userId, TeamRole role); public void RemoveMember(UserId userId); } ``` **API Endpoints:** ``` POST /api/auth/login # Login POST /api/auth/register # Register POST /api/auth/refresh # Refresh token POST /api/auth/logout # Logout GET /api/users # List users GET /api/users/{id} # Get user PUT /api/users/{id} # Update user GET /api/users/me # Get current user GET /api/teams # List teams POST /api/teams # Create team GET /api/teams/{id} # Get team PUT /api/teams/{id} # Update team POST /api/teams/{id}/members # Add team member DELETE /api/teams/{id}/members/{userId} # Remove team member ``` **gRPC Services:** ```protobuf // protos/user.proto syntax = "proto3"; package colaflow.user; service UserService { rpc GetUser (GetUserRequest) returns (UserResponse); rpc GetUsersByIds (GetUsersByIdsRequest) returns (UsersResponse); rpc ValidateToken (ValidateTokenRequest) returns (TokenValidationResponse); rpc GetUserPermissions (GetUserPermissionsRequest) returns (PermissionsResponse); } message GetUserRequest { string user_id = 1; } message UserResponse { string id = 1; string email = 2; string first_name = 3; string last_name = 4; string role = 5; } message GetUsersByIdsRequest { repeated string user_ids = 1; } message UsersResponse { repeated UserResponse users = 1; } message ValidateTokenRequest { string token = 1; } message TokenValidationResponse { bool is_valid = 1; string user_id = 2; string role = 3; } ``` **Published Events:** ```csharp public record UserRegisteredEvent(Guid UserId, string Email, string FullName); public record UserProfileUpdatedEvent(Guid UserId, string FirstName, string LastName); public record TeamCreatedEvent(Guid TeamId, string TeamName); public record TeamMemberAddedEvent(Guid TeamId, Guid UserId, string Role); ``` --- ### 2.4 Notification Service **Bounded Context:** Notifications & Real-time Communication **Domain Model:** ```csharp public class Notification : AggregateRoot { public NotificationId Id { get; private set; } public Guid RecipientId { get; private set; } public string Title { get; private set; } public string Message { get; private set; } public NotificationType Type { get; private set; } public bool IsRead { get; private set; } public DateTime CreatedAt { get; private set; } public void MarkAsRead(); } public class NotificationSubscription : Entity { public Guid UserId { get; private set; } public NotificationChannel Channel { get; private set; } // Email, SignalR, Push public string Endpoint { get; private set; } public bool IsActive { get; private set; } } ``` **API Endpoints:** ``` GET /api/notifications # List notifications POST /api/notifications # Create notification (internal) PATCH /api/notifications/{id}/read # Mark as read DELETE /api/notifications/{id} # Delete notification GET /api/subscriptions # List subscriptions POST /api/subscriptions # Create subscription DELETE /api/subscriptions/{id} # Delete subscription ``` **SignalR Hub:** ```csharp public class NotificationHub : Hub { public async Task JoinProject(string projectId) { await Groups.AddToGroupAsync(Context.ConnectionId, $"project_{projectId}"); } public async Task LeaveProject(string projectId) { await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"project_{projectId}"); } } // Server-side push await _hubContext.Clients.Group($"project_{projectId}").SendAsync("TaskUpdated", taskDto); ``` **Consumed Events:** ```csharp // Listens to events from other services public class TaskAssignedEventConsumer : IConsumer { public async Task Consume(ConsumeContext context) { var notification = Notification.Create( context.Message.AssigneeId, "Task Assigned", $"You have been assigned to task: {context.Message.TaskId}" ); await _notificationRepository.AddAsync(notification); await _hubContext.Clients.User(context.Message.AssigneeId.ToString()) .SendAsync("NotificationReceived", notification); } } ``` --- ### 2.5 Audit Service **Bounded Context:** Audit Logging & Event Store **Domain Model:** ```csharp public class AuditLog : Entity { public long Id { get; private set; } public string EntityType { get; private set; } public Guid EntityId { get; private set; } public string Action { get; private set; } public string Changes { get; private set; } // JSON public Guid UserId { get; private set; } public DateTime Timestamp { get; private set; } public string IpAddress { get; private set; } } public class DomainEventRecord : Entity { public long Id { get; private set; } public string EventType { get; private set; } public Guid AggregateId { get; private set; } public string EventData { get; private set; } // JSON public DateTime OccurredOn { get; private set; } public DateTime? ProcessedOn { get; private set; } } ``` **API Endpoints:** ``` GET /api/audit-logs # List audit logs GET /api/audit-logs/{entityType}/{entityId} # Get entity audit logs POST /api/audit-logs/{id}/rollback # Rollback changes GET /api/events # List domain events GET /api/events/{aggregateId} # Get aggregate events ``` **Consumed Events:** ```csharp // Listens to ALL domain events from all services public class UniversalEventConsumer : IConsumer { public async Task Consume(ConsumeContext context) { var eventRecord = new DomainEventRecord { EventType = context.Message.GetType().Name, AggregateId = context.Message.AggregateId, EventData = JsonSerializer.Serialize(context.Message), OccurredOn = context.Message.OccurredOn }; await _eventStoreRepository.AddAsync(eventRecord); } } ``` --- ### 2.6 AI Service (MCP Server) **Bounded Context:** AI Integration & MCP Protocol **Domain Model:** ```csharp public class AITask : AggregateRoot { public AITaskId Id { get; private set; } public string Prompt { get; private set; } public string Response { get; private set; } public AITaskStatus Status { get; private set; } public Guid CreatedBy { get; private set; } public DateTime CreatedAt { get; private set; } public void Complete(string response); public void Fail(string errorMessage); } public class MCPResource : Entity { public string ResourceId { get; private set; } public string Type { get; private set; } // projects.search, issues.search public string Schema { get; private set; } // JSON Schema } ``` **API Endpoints:** ``` POST /api/ai/tasks # Create AI task GET /api/ai/tasks/{id} # Get AI task GET /api/ai/tasks # List AI tasks GET /api/mcp/resources # List MCP resources GET /api/mcp/resources/{resourceId} # Get resource data POST /api/mcp/tools/{toolName} # Execute MCP tool GET /api/mcp/tools # List MCP tools ``` **MCP Resources:** ```json { "resources": [ { "uri": "colaflow://projects.search", "name": "Search Projects", "description": "Search and list projects", "mimeType": "application/json" }, { "uri": "colaflow://issues.search", "name": "Search Issues", "description": "Search tasks and issues", "mimeType": "application/json" } ] } ``` **MCP Tools:** ```json { "tools": [ { "name": "create_task", "description": "Create a new task", "inputSchema": { "type": "object", "properties": { "title": { "type": "string" }, "description": { "type": "string" }, "priority": { "type": "string", "enum": ["Low", "Medium", "High", "Urgent"] } }, "required": ["title"] } }, { "name": "update_task_status", "description": "Update task status with diff preview", "inputSchema": { "type": "object", "properties": { "task_id": { "type": "string" }, "new_status": { "type": "string" } }, "required": ["task_id", "new_status"] } } ] } ``` --- ## 3. Service Communication Patterns ### 3.1 Synchronous Communication (gRPC) **When to use gRPC:** - Real-time queries (e.g., "Get User by ID") - Validation requests (e.g., "Check if project exists") - Low-latency requirements **Example: Project Service → User Service** ```csharp // Project Service - gRPC Client public class UserServiceClient { private readonly UserService.UserServiceClient _grpcClient; public UserServiceClient(UserService.UserServiceClient grpcClient) { _grpcClient = grpcClient; } public async Task GetUserAsync(Guid userId) { var request = new GetUserRequest { UserId = userId.ToString() }; var response = await _grpcClient.GetUserAsync(request); return new UserDto { Id = Guid.Parse(response.Id), Email = response.Email, FirstName = response.FirstName, LastName = response.LastName }; } } // Used in Command Handler public class AssignTaskCommandHandler : IRequestHandler { private readonly ITaskRepository _taskRepository; private readonly UserServiceClient _userServiceClient; public async Task Handle(AssignTaskCommand request, CancellationToken ct) { // Validate user exists via gRPC var user = await _userServiceClient.GetUserAsync(request.AssigneeId); if (user == null) throw new NotFoundException("User not found"); // Assign task var task = await _taskRepository.GetByIdAsync(request.TaskId); task.AssignTo(request.AssigneeId); await _unitOfWork.CommitAsync(ct); return _mapper.Map(task); } } ``` **gRPC Client Registration:** ```csharp // Program.cs builder.Services.AddGrpcClient(options => { options.Address = new Uri("https://user-service:5003"); }) .ConfigurePrimaryHttpMessageHandler(() => { return new HttpClientHandler { ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator }; }); ``` ### 3.2 Asynchronous Communication (RabbitMQ + MassTransit) **When to use Async Messaging:** - Event notifications (e.g., "Task Created") - Cross-service workflows (e.g., Saga orchestration) - Decoupled communication **Example: Task Created Event Flow** ```csharp // Project Service - Publisher public class CreateTaskCommandHandler : IRequestHandler { private readonly IPublishEndpoint _publishEndpoint; public async Task Handle(CreateTaskCommand request, CancellationToken ct) { // Create task var task = Task.Create(request.Title, request.Description); await _taskRepository.AddAsync(task, ct); await _unitOfWork.CommitAsync(ct); // Publish event await _publishEndpoint.Publish(new TaskCreatedEvent { TaskId = task.Id, Title = task.Title, AssigneeId = task.AssigneeId, ProjectId = task.ProjectId }, ct); return _mapper.Map(task); } } // Notification Service - Consumer public class TaskCreatedEventConsumer : IConsumer { private readonly INotificationRepository _notificationRepository; private readonly IHubContext _hubContext; public async Task Consume(ConsumeContext context) { var evt = context.Message; // Create notification var notification = Notification.Create( evt.AssigneeId, "New Task Assigned", $"You have been assigned to task: {evt.Title}" ); await _notificationRepository.AddAsync(notification); // Send SignalR notification await _hubContext.Clients.User(evt.AssigneeId.ToString()) .SendAsync("TaskCreated", new { evt.TaskId, evt.Title }); } } // Audit Service - Consumer public class TaskCreatedEventConsumer : IConsumer { private readonly IEventStoreRepository _eventStoreRepository; public async Task Consume(ConsumeContext context) { // Store event in event store var eventRecord = new DomainEventRecord { EventType = nameof(TaskCreatedEvent), AggregateId = context.Message.TaskId, EventData = JsonSerializer.Serialize(context.Message), OccurredOn = DateTime.UtcNow }; await _eventStoreRepository.AddAsync(eventRecord); } } ``` **MassTransit Configuration:** ```csharp // Program.cs builder.Services.AddMassTransit(config => { // Register consumers config.AddConsumer(); config.AddConsumer(); config.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://rabbitmq:5672", h => { h.Username("guest"); h.Password("guest"); }); // Configure endpoints cfg.ReceiveEndpoint("notification-service", e => { e.ConfigureConsumer(context); }); }); }); ``` --- ## 4. Distributed Transactions - Saga Pattern ### 4.1 Saga Orchestration with MassTransit **Use Case: Create Project with Default Workflow** **Requirements:** 1. Project Service: Create project 2. Workflow Service: Create default workflow 3. Notification Service: Send notification 4. If any step fails → compensate (rollback) **Saga State Machine:** ```csharp // Saga State public class CreateProjectSagaState : SagaStateMachineInstance { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } // Saga data public Guid ProjectId { get; set; } public string ProjectName { get; set; } public Guid OwnerId { get; set; } public Guid? WorkflowId { get; set; } // Timestamps public DateTime CreatedAt { get; set; } public DateTime? CompletedAt { get; set; } } // Saga Definition public class CreateProjectSaga : MassTransitStateMachine { public State CreatingProject { get; private set; } public State CreatingWorkflow { get; private set; } public State SendingNotification { get; private set; } public State Completed { get; private set; } public State Failed { get; private set; } // Events public Event CreateProject { get; private set; } public Event ProjectCreated { get; private set; } public Event CreateWorkflow { get; private set; } public Event WorkflowCreated { get; private set; } public Event ProjectFailed { get; private set; } public Event WorkflowFailed { get; private set; } public CreateProjectSaga() { InstanceState(x => x.CurrentState); // Step 1: Create Project Initially( When(CreateProject) .Then(context => { context.Saga.ProjectName = context.Message.Name; context.Saga.OwnerId = context.Message.OwnerId; context.Saga.CreatedAt = DateTime.UtcNow; }) .TransitionTo(CreatingProject) .Publish(context => new CreateProjectInternalCommand { CorrelationId = context.Saga.CorrelationId, Name = context.Message.Name, Description = context.Message.Description, Key = context.Message.Key, OwnerId = context.Message.OwnerId }) ); // Step 2: Project Created → Create Workflow During(CreatingProject, When(ProjectCreated) .Then(context => { context.Saga.ProjectId = context.Message.ProjectId; }) .TransitionTo(CreatingWorkflow) .PublishAsync(context => context.Init(new { CorrelationId = context.Saga.CorrelationId, ProjectId = context.Message.ProjectId, Name = $"{context.Saga.ProjectName} Workflow" })) ); // Step 3: Workflow Created → Send Notification During(CreatingWorkflow, When(WorkflowCreated) .Then(context => { context.Saga.WorkflowId = context.Message.WorkflowId; }) .TransitionTo(SendingNotification) .PublishAsync(context => context.Init(new { RecipientId = context.Saga.OwnerId, Title = "Project Created", Message = $"Project '{context.Saga.ProjectName}' has been created successfully." })) ); // Step 4: Notification Sent → Complete During(SendingNotification, When(NotificationSent) .Then(context => { context.Saga.CompletedAt = DateTime.UtcNow; }) .TransitionTo(Completed) .Finalize() ); // Compensation: Project Creation Failed During(CreatingProject, When(ProjectFailed) .Then(context => { // Log failure Console.WriteLine($"Project creation failed: {context.Message.Reason}"); }) .TransitionTo(Failed) .Finalize() ); // Compensation: Workflow Creation Failed → Delete Project During(CreatingWorkflow, When(WorkflowFailed) .Then(context => { Console.WriteLine($"Workflow creation failed: {context.Message.Reason}"); }) .PublishAsync(context => context.Init(new { ProjectId = context.Saga.ProjectId, Reason = "Workflow creation failed" })) .TransitionTo(Failed) .Finalize() ); SetCompletedWhenFinalized(); } } // Saga Registration builder.Services.AddMassTransit(config => { config.AddSagaStateMachine() .EntityFrameworkRepository(r => { r.ConcurrencyMode = ConcurrencyMode.Pessimistic; r.AddDbContext((provider, builder) => { builder.UseNpgsql(connectionString); }); }); config.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://rabbitmq:5672"); cfg.ConfigureEndpoints(context); }); }); ``` **Saga Database Table:** ```sql CREATE TABLE create_project_saga_state ( correlation_id UUID PRIMARY KEY, current_state VARCHAR(100) NOT NULL, project_id UUID, project_name VARCHAR(200), owner_id UUID, workflow_id UUID, created_at TIMESTAMP NOT NULL, completed_at TIMESTAMP ); ``` ### 4.2 Outbox Pattern (Reliable Messaging) **Problem:** Ensure domain events are published even if RabbitMQ is down. **Solution:** Store events in database, then publish asynchronously. ```csharp // Outbox Message Entity public class OutboxMessage { public Guid Id { get; set; } public string Type { get; set; } public string Content { get; set; } // JSON public DateTime OccurredOn { get; set; } public DateTime? ProcessedOn { get; set; } public string Error { get; set; } } // Save to Outbox in same transaction public async Task CommitAsync(CancellationToken cancellationToken = default) { var domainEvents = ChangeTracker .Entries() .SelectMany(x => x.Entity.DomainEvents) .ToList(); // Store events in outbox foreach (var domainEvent in domainEvents) { var outboxMessage = new OutboxMessage { Id = Guid.NewGuid(), Type = domainEvent.GetType().Name, Content = JsonSerializer.Serialize(domainEvent), OccurredOn = DateTime.UtcNow }; OutboxMessages.Add(outboxMessage); } // Save changes (domain entities + outbox messages in same transaction) var result = await base.SaveChangesAsync(cancellationToken); return result; } // Background Service: Process Outbox public class OutboxProcessor : BackgroundService { private readonly IServiceProvider _serviceProvider; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); var publishEndpoint = scope.ServiceProvider.GetRequiredService(); // Get unprocessed messages var messages = await dbContext.OutboxMessages .Where(m => m.ProcessedOn == null) .OrderBy(m => m.OccurredOn) .Take(100) .ToListAsync(stoppingToken); foreach (var message in messages) { try { // Deserialize and publish var eventType = Type.GetType(message.Type); var domainEvent = JsonSerializer.Deserialize(message.Content, eventType); await publishEndpoint.Publish(domainEvent, stoppingToken); // Mark as processed message.ProcessedOn = DateTime.UtcNow; } catch (Exception ex) { message.Error = ex.Message; } } await dbContext.SaveChangesAsync(stoppingToken); await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } } ``` --- ## 5. API Gateway (YARP) ### 5.1 YARP Configuration **Why YARP:** - Native .NET 9 support - High performance reverse proxy - Dynamic configuration - Built-in load balancing - Request/response transformation **appsettings.json:** ```json { "ReverseProxy": { "Routes": { "project-route": { "ClusterId": "project-cluster", "AuthorizationPolicy": "authenticated", "Match": { "Path": "/api/projects/{**catch-all}" }, "Transforms": [ { "RequestHeader": "X-Forwarded-For", "Append": "{RemoteIpAddress}" } ] }, "workflow-route": { "ClusterId": "workflow-cluster", "AuthorizationPolicy": "authenticated", "Match": { "Path": "/api/workflows/{**catch-all}" } }, "user-route": { "ClusterId": "user-cluster", "Match": { "Path": "/api/users/{**catch-all}" } }, "auth-route": { "ClusterId": "user-cluster", "Match": { "Path": "/api/auth/{**catch-all}" } }, "notification-route": { "ClusterId": "notification-cluster", "AuthorizationPolicy": "authenticated", "Match": { "Path": "/api/notifications/{**catch-all}" } }, "audit-route": { "ClusterId": "audit-cluster", "AuthorizationPolicy": "admin", "Match": { "Path": "/api/audit-logs/{**catch-all}" } }, "ai-route": { "ClusterId": "ai-cluster", "AuthorizationPolicy": "authenticated", "Match": { "Path": "/api/ai/{**catch-all}" } }, "mcp-route": { "ClusterId": "ai-cluster", "AuthorizationPolicy": "mcp-client", "Match": { "Path": "/api/mcp/{**catch-all}" } } }, "Clusters": { "project-cluster": { "LoadBalancingPolicy": "RoundRobin", "Destinations": { "project-service-1": { "Address": "http://project-service:5001" } } }, "workflow-cluster": { "LoadBalancingPolicy": "RoundRobin", "Destinations": { "workflow-service-1": { "Address": "http://workflow-service:5002" } } }, "user-cluster": { "LoadBalancingPolicy": "RoundRobin", "Destinations": { "user-service-1": { "Address": "http://user-service:5003" } } }, "notification-cluster": { "LoadBalancingPolicy": "RoundRobin", "Destinations": { "notification-service-1": { "Address": "http://notification-service:5004" } } }, "audit-cluster": { "LoadBalancingPolicy": "RoundRobin", "Destinations": { "audit-service-1": { "Address": "http://audit-service:5005" } } }, "ai-cluster": { "LoadBalancingPolicy": "RoundRobin", "Destinations": { "ai-service-1": { "Address": "http://ai-service:5006" } } } } } } ``` ### 5.2 API Gateway Code ```csharp // Program.cs var builder = WebApplication.CreateBuilder(args); // Add YARP builder.Services.AddReverseProxy() .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy")); // Add Authentication builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme) .AddJwtBearer(options => { options.Authority = "http://user-service:5003"; options.TokenValidationParameters = new TokenValidationParameters { ValidateIssuer = true, ValidateAudience = true, ValidateLifetime = true, ValidIssuer = "ColaFlow", ValidAudience = "ColaFlow" }; }); // Add Authorization Policies builder.Services.AddAuthorization(options => { options.AddPolicy("authenticated", policy => policy.RequireAuthenticatedUser()); options.AddPolicy("admin", policy => policy.RequireRole("Admin")); options.AddPolicy("mcp-client", policy => policy.RequireClaim("client_type", "mcp")); }); // Add Rate Limiting builder.Services.AddRateLimiter(options => { options.GlobalLimiter = PartitionedRateLimiter.Create(context => { return RateLimitPartition.GetFixedWindowLimiter( partitionKey: context.User.Identity?.Name ?? context.Request.Headers.Host.ToString(), factory: partition => new FixedWindowRateLimiterOptions { AutoReplenishment = true, PermitLimit = 100, QueueLimit = 0, Window = TimeSpan.FromMinutes(1) }); }); }); // Add Distributed Tracing builder.Services.AddOpenTelemetry() .WithTracing(tracing => tracing .AddAspNetCoreInstrumentation() .AddHttpClientInstrumentation() .AddJaegerExporter(options => { options.AgentHost = "jaeger"; options.AgentPort = 6831; })); var app = builder.Build(); app.UseRateLimiter(); app.UseAuthentication(); app.UseAuthorization(); // Map YARP app.MapReverseProxy(); app.Run(); ``` --- ## 6. Service Discovery (Consul) ### 6.1 Consul Configuration **Why Consul:** - Service registry and health checking - Dynamic service discovery - Key/value store for configuration - Production-ready and battle-tested **Service Registration:** ```csharp // Program.cs - Each Service builder.Services.AddConsul(builder.Configuration); public static class ConsulExtensions { public static IServiceCollection AddConsul(this IServiceCollection services, IConfiguration configuration) { var consulConfig = configuration.GetSection("Consul").Get(); services.AddSingleton(sp => new ConsulClient(config => { config.Address = new Uri(consulConfig.Address); })); services.AddHostedService(); return services; } } public class ConsulHostedService : IHostedService { private readonly IConsulClient _consulClient; private readonly IConfiguration _configuration; private string _registrationId; public async Task StartAsync(CancellationToken cancellationToken) { var serviceConfig = _configuration.GetSection("Service").Get(); _registrationId = $"{serviceConfig.Name}-{serviceConfig.Id}"; var registration = new AgentServiceRegistration { ID = _registrationId, Name = serviceConfig.Name, Address = serviceConfig.Address, Port = serviceConfig.Port, Tags = new[] { "colaflow", serviceConfig.Name }, Check = new AgentServiceCheck { HTTP = $"http://{serviceConfig.Address}:{serviceConfig.Port}/health", Interval = TimeSpan.FromSeconds(10), Timeout = TimeSpan.FromSeconds(5), DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1) } }; await _consulClient.Agent.ServiceRegister(registration, cancellationToken); } public async Task StopAsync(CancellationToken cancellationToken) { await _consulClient.Agent.ServiceDeregister(_registrationId, cancellationToken); } } ``` **appsettings.json:** ```json { "Consul": { "Address": "http://consul:8500" }, "Service": { "Name": "project-service", "Id": "project-service-1", "Address": "project-service", "Port": 5001 } } ``` **Service Discovery Client:** ```csharp public class ServiceDiscoveryClient { private readonly IConsulClient _consulClient; public async Task GetServiceAddressAsync(string serviceName) { var services = await _consulClient.Health.Service(serviceName, "", true); if (!services.Response.Any()) throw new Exception($"Service '{serviceName}' not found"); var service = services.Response.First(); return $"http://{service.Service.Address}:{service.Service.Port}"; } } // Usage var userServiceAddress = await _serviceDiscovery.GetServiceAddressAsync("user-service"); var grpcClient = new UserService.UserServiceClient( GrpcChannel.ForAddress(userServiceAddress) ); ``` --- ## 7. Distributed Tracing (OpenTelemetry + Jaeger) ### 7.1 OpenTelemetry Configuration ```csharp // Program.cs - Each Service builder.Services.AddOpenTelemetry() .WithTracing(tracing => { tracing .AddAspNetCoreInstrumentation(options => { options.RecordException = true; options.Filter = (httpContext) => !httpContext.Request.Path.Value.Contains("health"); }) .AddHttpClientInstrumentation() .AddGrpcClientInstrumentation() .AddEntityFrameworkCoreInstrumentation(options => { options.SetDbStatementForText = true; }) .AddSource("MassTransit") .SetResourceBuilder(ResourceBuilder.CreateDefault() .AddService("project-service") .AddAttributes(new Dictionary { ["environment"] = "production", ["version"] = "1.0.0" })) .AddJaegerExporter(options => { options.AgentHost = "jaeger"; options.AgentPort = 6831; }); }) .WithMetrics(metrics => { metrics .AddAspNetCoreInstrumentation() .AddHttpClientInstrumentation() .AddRuntimeInstrumentation() .AddPrometheusExporter(); }); ``` ### 7.2 Custom Instrumentation ```csharp public class CreateTaskCommandHandler : IRequestHandler { private readonly ActivitySource _activitySource = new("ColaFlow.ProjectService"); public async Task Handle(CreateTaskCommand request, CancellationToken ct) { using var activity = _activitySource.StartActivity("CreateTask", ActivityKind.Server); activity?.SetTag("task.title", request.Title); activity?.SetTag("task.priority", request.Priority); try { // Business logic var task = Task.Create(request.Title, request.Description); await _taskRepository.AddAsync(task, ct); activity?.SetTag("task.id", task.Id); activity?.SetStatus(ActivityStatusCode.Ok); return _mapper.Map(task); } catch (Exception ex) { activity?.SetStatus(ActivityStatusCode.Error, ex.Message); activity?.RecordException(ex); throw; } } } ``` --- ## 8. Project Structure ### 8.1 Overall Structure ``` product-master/ ├── services/ │ ├── project-service/ │ │ ├── src/ │ │ │ ├── ColaFlow.ProjectService.Domain/ │ │ │ ├── ColaFlow.ProjectService.Application/ │ │ │ ├── ColaFlow.ProjectService.Infrastructure/ │ │ │ └── ColaFlow.ProjectService.API/ │ │ ├── tests/ │ │ ├── Dockerfile │ │ └── ColaFlow.ProjectService.sln │ ├── workflow-service/ │ │ ├── src/ │ │ ├── tests/ │ │ ├── Dockerfile │ │ └── ColaFlow.WorkflowService.sln │ ├── user-service/ │ ├── notification-service/ │ ├── audit-service/ │ └── ai-service/ ├── gateway/ │ └── api-gateway/ │ ├── src/ │ │ └── ColaFlow.ApiGateway/ │ │ ├── Program.cs │ │ ├── appsettings.json │ │ └── Middleware/ │ ├── Dockerfile │ └── ColaFlow.ApiGateway.sln ├── shared/ │ ├── ColaFlow.Shared.Contracts/ │ │ ├── DTOs/ │ │ ├── Events/ │ │ └── Protos/ │ │ ├── project.proto │ │ ├── workflow.proto │ │ └── user.proto │ ├── ColaFlow.Shared.Messaging/ │ │ ├── MassTransit/ │ │ └── EventBus/ │ └── ColaFlow.Shared.Common/ │ ├── Extensions/ │ ├── Utilities/ │ └── Constants/ ├── infrastructure/ │ ├── docker/ │ │ ├── docker-compose.microservices.yml │ │ ├── docker-compose.infrastructure.yml │ │ └── .env │ ├── k8s/ │ │ ├── namespaces/ │ │ │ └── colaflow-namespace.yaml │ │ ├── services/ │ │ │ ├── project-service.yaml │ │ │ ├── workflow-service.yaml │ │ │ ├── user-service.yaml │ │ │ ├── notification-service.yaml │ │ │ ├── audit-service.yaml │ │ │ ├── ai-service.yaml │ │ │ └── api-gateway.yaml │ │ ├── deployments/ │ │ │ ├── project-service-deployment.yaml │ │ │ ├── workflow-service-deployment.yaml │ │ │ └── ... (one per service) │ │ ├── configmaps/ │ │ │ └── appsettings-configmap.yaml │ │ ├── secrets/ │ │ │ └── database-secrets.yaml │ │ ├── ingress/ │ │ │ └── ingress.yaml │ │ └── infrastructure/ │ │ ├── postgres-statefulset.yaml │ │ ├── rabbitmq-deployment.yaml │ │ ├── redis-deployment.yaml │ │ ├── consul-deployment.yaml │ │ └── jaeger-deployment.yaml │ └── helm/ │ └── colaflow/ │ ├── Chart.yaml │ ├── values.yaml │ ├── values-dev.yaml │ ├── values-prod.yaml │ └── templates/ │ ├── services/ │ ├── deployments/ │ ├── configmaps/ │ ├── secrets/ │ └── ingress/ ├── colaflow-web/ ├── scripts/ │ ├── build-all.sh │ ├── deploy-k8s.sh │ └── generate-protos.sh └── docs/ ├── Microservices-Architecture.md ├── Service-Development-Guide.md └── Operational-Runbook.md ``` ### 8.2 Service Structure (Example: Project Service) ``` project-service/ ├── src/ │ ├── ColaFlow.ProjectService.Domain/ │ │ ├── Aggregates/ │ │ │ └── ProjectAggregate/ │ │ │ ├── Project.cs │ │ │ ├── Epic.cs │ │ │ ├── Story.cs │ │ │ └── Task.cs │ │ ├── ValueObjects/ │ │ ├── Events/ │ │ ├── Interfaces/ │ │ └── Exceptions/ │ ├── ColaFlow.ProjectService.Application/ │ │ ├── Commands/ │ │ │ ├── CreateProject/ │ │ │ ├── UpdateProject/ │ │ │ ├── CreateTask/ │ │ │ └── UpdateTaskStatus/ │ │ ├── Queries/ │ │ │ ├── GetProject/ │ │ │ ├── GetKanbanBoard/ │ │ │ └── SearchTasks/ │ │ ├── DTOs/ │ │ ├── Mappings/ │ │ └── Services/ │ │ └── Clients/ │ │ ├── UserServiceClient.cs │ │ └── WorkflowServiceClient.cs │ ├── ColaFlow.ProjectService.Infrastructure/ │ │ ├── Persistence/ │ │ │ ├── ProjectDbContext.cs │ │ │ ├── Repositories/ │ │ │ └── Configurations/ │ │ ├── Messaging/ │ │ │ ├── EventPublisher.cs │ │ │ └── Consumers/ │ │ ├── gRPC/ │ │ │ └── ProjectGrpcService.cs │ │ └── Caching/ │ └── ColaFlow.ProjectService.API/ │ ├── Controllers/ │ │ ├── ProjectsController.cs │ │ ├── EpicsController.cs │ │ ├── StoriesController.cs │ │ └── TasksController.cs │ ├── Middleware/ │ ├── Program.cs │ ├── appsettings.json │ └── Dockerfile ├── tests/ │ ├── Domain.Tests/ │ ├── Application.Tests/ │ └── Integration.Tests/ └── ColaFlow.ProjectService.sln ``` --- ## 9. Code Examples ### 9.1 gRPC Service Implementation **protos/project.proto:** ```protobuf syntax = "proto3"; package colaflow.project; service ProjectService { rpc GetProject (GetProjectRequest) returns (ProjectResponse); rpc GetTasksByAssignee (GetTasksByAssigneeRequest) returns (TaskListResponse); rpc ValidateProjectExists (ValidateProjectRequest) returns (ValidationResponse); } message GetProjectRequest { string project_id = 1; } message ProjectResponse { string id = 1; string name = 2; string key = 3; string status = 4; } message GetTasksByAssigneeRequest { string assignee_id = 1; int32 page = 2; int32 page_size = 3; } message TaskListResponse { repeated TaskDto tasks = 1; int32 total_count = 2; } message TaskDto { string id = 1; string title = 2; string status = 3; string priority = 4; } message ValidateProjectRequest { string project_id = 1; } message ValidationResponse { bool exists = 1; string message = 2; } ``` **Server Implementation:** ```csharp // ColaFlow.ProjectService.Infrastructure/gRPC/ProjectGrpcService.cs public class ProjectGrpcService : ProjectService.ProjectServiceBase { private readonly IMediator _mediator; private readonly IMapper _mapper; public ProjectGrpcService(IMediator mediator, IMapper mapper) { _mediator = mediator; _mapper = mapper; } public override async Task GetProject( GetProjectRequest request, ServerCallContext context) { var query = new GetProjectByIdQuery(Guid.Parse(request.ProjectId)); var project = await _mediator.Send(query); return new ProjectResponse { Id = project.Id.ToString(), Name = project.Name, Key = project.Key, Status = project.Status.ToString() }; } public override async Task GetTasksByAssignee( GetTasksByAssigneeRequest request, ServerCallContext context) { var query = new GetTasksByAssigneeQuery( Guid.Parse(request.AssigneeId), request.Page, request.PageSize ); var result = await _mediator.Send(query); var response = new TaskListResponse { TotalCount = result.TotalCount }; response.Tasks.AddRange(result.Items.Select(task => new TaskDto { Id = task.Id.ToString(), Title = task.Title, Status = task.Status.ToString(), Priority = task.Priority.ToString() })); return response; } public override async Task ValidateProjectExists( ValidateProjectRequest request, ServerCallContext context) { try { var query = new GetProjectByIdQuery(Guid.Parse(request.ProjectId)); var project = await _mediator.Send(query); return new ValidationResponse { Exists = true, Message = "Project exists" }; } catch (NotFoundException) { return new ValidationResponse { Exists = false, Message = "Project not found" }; } } } // Program.cs builder.Services.AddGrpc(); app.MapGrpcService(); ``` **Client Usage:** ```csharp // Workflow Service - gRPC Client public class ProjectServiceClient { private readonly ProjectService.ProjectServiceClient _grpcClient; public ProjectServiceClient(ProjectService.ProjectServiceClient grpcClient) { _grpcClient = grpcClient; } public async Task ValidateProjectExistsAsync(Guid projectId) { var request = new ValidateProjectRequest { ProjectId = projectId.ToString() }; var response = await _grpcClient.ValidateProjectExistsAsync(request); return response.Exists; } } // Program.cs - Workflow Service builder.Services.AddGrpcClient(options => { var projectServiceAddress = await serviceDiscovery.GetServiceAddressAsync("project-service"); options.Address = new Uri(projectServiceAddress); }) .ConfigurePrimaryHttpMessageHandler(() => { return new SocketsHttpHandler { PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan, KeepAlivePingDelay = TimeSpan.FromSeconds(60), KeepAlivePingTimeout = TimeSpan.FromSeconds(30), EnableMultipleHttp2Connections = true }; }); ``` ### 9.2 Saga Pattern Example (Complete) See **Section 4.1** for complete Saga implementation. ### 9.3 API Gateway Middleware ```csharp // Correlation ID Middleware public class CorrelationIdMiddleware { private readonly RequestDelegate _next; public async Task InvokeAsync(HttpContext context) { var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault() ?? Guid.NewGuid().ToString(); context.Request.Headers["X-Correlation-ID"] = correlationId; context.Response.Headers["X-Correlation-ID"] = correlationId; // Add to activity for distributed tracing Activity.Current?.SetTag("correlation_id", correlationId); await _next(context); } } // Usage app.UseMiddleware(); ``` --- ## 10. Docker Compose (Local Development) I'll create the Docker Compose configuration file next. --- **Status:** Document creation in progress. Will continue with Docker Compose, Kubernetes, Helm Charts, and Migration Plan next.