Files
ColaFlow/docs/Microservices-Architecture.md
Yaojia Wang 014d62bcc2 Project Init
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-02 23:55:18 +01:00

58 KiB

ColaFlow Microservices Architecture Design

Version: 1.0 Date: 2025-11-02 Status: Production-Ready Design Author: Architecture Team User Decision: Adopt Microservices Architecture (with full awareness of costs and risks)


Executive Summary

This document presents a production-grade microservices architecture for ColaFlow, as explicitly requested by the user. While the architecture team has previously recommended a Modular Monolith approach (see Modular-Monolith-Architecture.md), this document respects the user's strategic decision to adopt microservices from the start.

Key Architectural Decisions

Decision Technology Rationale
Service Communication (Sync) gRPC High performance, strong contracts, native .NET support
Service Communication (Async) RabbitMQ + MassTransit Reliable messaging, event-driven architecture
API Gateway YARP (.NET 9) Native .NET, high performance, reverse proxy
Service Discovery Consul / Kubernetes DNS Production-ready, automatic service registration
Distributed Tracing OpenTelemetry + Jaeger Vendor-neutral, comprehensive observability
Distributed Transactions Saga Pattern (MassTransit) Orchestration-based, reliable compensation
Configuration Management Consul / Azure App Config Centralized, dynamic configuration
Container Orchestration Kubernetes + Helm Industry standard, mature ecosystem
Message Format Protocol Buffers (gRPC) + JSON (REST) Type-safe, efficient serialization
Database per Service PostgreSQL (per service) Data isolation, independent scaling

Cost and Risk Acknowledgment

Development Timeline Impact:

  • Modular Monolith: 8-10 weeks to M1
  • Microservices: 16-20 weeks to M1 (+8-12 weeks)

Team Skill Requirements:

  • Distributed systems expertise required
  • DevOps maturity critical
  • Kubernetes operational knowledge
  • Distributed transaction patterns (Saga)

Operational Complexity:

  • 6+ microservices to manage
  • 6+ databases to maintain
  • API Gateway, Service Mesh, Message Queue
  • Distributed tracing and monitoring infrastructure

Infrastructure Cost Increase:

  • Modular Monolith: ~$500/month
  • Microservices: ~$3,000-5,000/month (6-10x increase)

1. Microservices Architecture Overview

1.1 System Architecture Diagram

graph TB
    subgraph "Client Layer"
        WebUI[Web Browser<br/>Next.js 15]
        MobileApp[Mobile App<br/>Future]
        AITools[AI Tools<br/>ChatGPT/Claude]
    end

    subgraph "API Gateway Layer"
        YARP[YARP API Gateway<br/>.NET 9]
    end

    subgraph "Service Layer"
        ProjectSvc[Project Service<br/>Projects/Epics/Stories/Tasks]
        WorkflowSvc[Workflow Service<br/>Workflow Engine]
        UserSvc[User Service<br/>Auth & Users]
        NotifSvc[Notification Service<br/>SignalR/Email]
        AuditSvc[Audit Service<br/>Event Store]
        AISvc[AI Service<br/>MCP Server]
    end

    subgraph "Infrastructure Layer"
        RabbitMQ[RabbitMQ<br/>Message Bus]
        Redis[Redis<br/>Cache/Session]
        Consul[Consul<br/>Service Discovery]
        Jaeger[Jaeger<br/>Distributed Tracing]
    end

    subgraph "Data Layer"
        DB1[(PostgreSQL 1<br/>Projects)]
        DB2[(PostgreSQL 2<br/>Workflows)]
        DB3[(PostgreSQL 3<br/>Users)]
        DB4[(PostgreSQL 4<br/>Notifications)]
        DB5[(PostgreSQL 5<br/>Audit)]
        DB6[(PostgreSQL 6<br/>AI)]
    end

    WebUI --> YARP
    MobileApp --> YARP
    AITools --> YARP

    YARP --> ProjectSvc
    YARP --> WorkflowSvc
    YARP --> UserSvc
    YARP --> NotifSvc
    YARP --> AuditSvc
    YARP --> AISvc

    ProjectSvc --> DB1
    WorkflowSvc --> DB2
    UserSvc --> DB3
    NotifSvc --> DB4
    AuditSvc --> DB5
    AISvc --> DB6

    ProjectSvc -.gRPC.-> WorkflowSvc
    ProjectSvc -.gRPC.-> UserSvc
    WorkflowSvc -.gRPC.-> ProjectSvc

    ProjectSvc --> RabbitMQ
    WorkflowSvc --> RabbitMQ
    NotifSvc --> RabbitMQ
    AuditSvc --> RabbitMQ

    ProjectSvc --> Redis
    UserSvc --> Redis

    ProjectSvc --> Consul
    WorkflowSvc --> Consul
    UserSvc --> Consul
    NotifSvc --> Consul
    AuditSvc --> Consul
    AISvc --> Consul

    ProjectSvc --> Jaeger
    WorkflowSvc --> Jaeger
    UserSvc --> Jaeger

1.2 Service Catalog

Service Port Responsibility Database Key APIs
Project Service 5001 Project/Epic/Story/Task management PostgreSQL 1 /api/projects/*, gRPC
Workflow Service 5002 Workflow engine, state transitions PostgreSQL 2 /api/workflows/*, gRPC
User Service 5003 Authentication, authorization, users PostgreSQL 3 /api/users/*, /api/auth/*
Notification Service 5004 SignalR, email, push notifications PostgreSQL 4 /api/notifications/*, SignalR
Audit Service 5005 Event store, audit logs, rollback PostgreSQL 5 /api/audit/*
AI Service 5006 MCP Server, AI task generation PostgreSQL 6 /api/ai/*, MCP Resources
API Gateway 8080 Routing, auth, rate limiting - All external routes

2. Service Design - Bounded Contexts

2.1 Project Service (Core Domain)

Bounded Context: Project Management

Domain Model:

// Project Aggregate Root
public class Project : AggregateRoot
{
    public ProjectId Id { get; private set; }
    public string Name { get; private set; }
    public ProjectKey Key { get; private set; }

    private readonly List<Epic> _epics = new();
    public IReadOnlyCollection<Epic> Epics => _epics.AsReadOnly();

    // Business methods
    public Epic CreateEpic(string name, string description);
    public void UpdateDetails(string name, string description);
}

// Epic Entity
public class Epic : Entity
{
    public EpicId Id { get; private set; }
    public string Name { get; private set; }

    private readonly List<Story> _stories = new();
    public IReadOnlyCollection<Story> Stories => _stories.AsReadOnly();

    public Story CreateStory(string title, string description);
}

API Endpoints (REST):

GET    /api/projects              # List projects
POST   /api/projects              # Create project
GET    /api/projects/{id}         # Get project
PUT    /api/projects/{id}         # Update project
DELETE /api/projects/{id}         # Delete project

GET    /api/projects/{id}/epics   # List epics
POST   /api/projects/{id}/epics   # Create epic
GET    /api/epics/{id}            # Get epic
PUT    /api/epics/{id}            # Update epic

GET    /api/epics/{id}/stories    # List stories
POST   /api/epics/{id}/stories    # Create story
GET    /api/stories/{id}          # Get story
PUT    /api/stories/{id}          # Update story

GET    /api/stories/{id}/tasks    # List tasks
POST   /api/stories/{id}/tasks    # Create task
GET    /api/tasks/{id}            # Get task
PUT    /api/tasks/{id}            # Update task
PATCH  /api/tasks/{id}/status     # Update task status

gRPC Services:

// protos/project.proto
syntax = "proto3";
package colaflow.project;

service ProjectService {
  rpc GetProject (GetProjectRequest) returns (ProjectResponse);
  rpc GetProjectByKey (GetProjectByKeyRequest) returns (ProjectResponse);
  rpc GetTasksByAssignee (GetTasksByAssigneeRequest) returns (TaskListResponse);
  rpc ValidateProjectExists (ValidateProjectRequest) returns (ValidationResponse);
}

message GetProjectRequest {
  string project_id = 1;
}

message ProjectResponse {
  string id = 1;
  string name = 2;
  string key = 3;
  string status = 4;
  string owner_id = 5;
}

message GetTasksByAssigneeRequest {
  string assignee_id = 1;
  int32 page = 2;
  int32 page_size = 3;
}

message TaskListResponse {
  repeated TaskDto tasks = 1;
  int32 total_count = 2;
}

message TaskDto {
  string id = 1;
  string title = 2;
  string status = 3;
  string priority = 4;
  string project_id = 5;
}

Published Events:

public record ProjectCreatedEvent(Guid ProjectId, string ProjectName, Guid OwnerId);
public record TaskStatusChangedEvent(Guid TaskId, string OldStatus, string NewStatus, Guid ChangedBy);
public record TaskAssignedEvent(Guid TaskId, Guid AssigneeId, Guid AssignedBy);
public record EpicCreatedEvent(Guid EpicId, string EpicName, Guid ProjectId);

Database Schema:

-- Projects Table
CREATE TABLE projects (
    id UUID PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    key VARCHAR(10) NOT NULL UNIQUE,
    description TEXT,
    status VARCHAR(50) NOT NULL,
    owner_id UUID NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP
);

-- Epics Table
CREATE TABLE epics (
    id UUID PRIMARY KEY,
    project_id UUID NOT NULL REFERENCES projects(id),
    name VARCHAR(200) NOT NULL,
    description TEXT,
    status VARCHAR(50) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- Stories Table
CREATE TABLE stories (
    id UUID PRIMARY KEY,
    epic_id UUID NOT NULL REFERENCES epics(id),
    title VARCHAR(200) NOT NULL,
    description TEXT,
    status VARCHAR(50) NOT NULL,
    priority VARCHAR(50) NOT NULL,
    assignee_id UUID,
    estimated_hours DECIMAL(10,2),
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- Tasks Table
CREATE TABLE tasks (
    id UUID PRIMARY KEY,
    story_id UUID NOT NULL REFERENCES stories(id),
    title VARCHAR(200) NOT NULL,
    description TEXT,
    status VARCHAR(50) NOT NULL,
    priority VARCHAR(50) NOT NULL,
    assignee_id UUID,
    estimated_hours DECIMAL(10,2),
    actual_hours DECIMAL(10,2),
    custom_fields JSONB,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- Indexes
CREATE INDEX idx_projects_key ON projects(key);
CREATE INDEX idx_epics_project_id ON epics(project_id);
CREATE INDEX idx_stories_epic_id ON stories(epic_id);
CREATE INDEX idx_stories_assignee_id ON stories(assignee_id);
CREATE INDEX idx_tasks_story_id ON tasks(story_id);
CREATE INDEX idx_tasks_assignee_id ON tasks(assignee_id);
CREATE INDEX idx_tasks_status ON tasks(status);

2.2 Workflow Service

Bounded Context: Workflow Engine

Domain Model:

public class Workflow : AggregateRoot
{
    public WorkflowId Id { get; private set; }
    public string Name { get; private set; }
    public Guid ProjectId { get; private set; }
    public bool IsDefault { get; private set; }

    private readonly List<WorkflowState> _states = new();
    public IReadOnlyCollection<WorkflowState> States => _states.AsReadOnly();

    public void AddState(string stateName);
    public void AddTransition(string fromState, string toState);
    public bool CanTransition(string currentState, string targetState);
}

public class WorkflowState : Entity
{
    public string Name { get; private set; }
    public StateCategory Category { get; private set; } // ToDo, InProgress, Done

    private readonly List<StateTransition> _transitions = new();
    public IReadOnlyCollection<StateTransition> Transitions => _transitions.AsReadOnly();
}

API Endpoints:

GET    /api/workflows                     # List all workflows
POST   /api/workflows                     # Create workflow
GET    /api/workflows/{id}                # Get workflow
PUT    /api/workflows/{id}                # Update workflow
DELETE /api/workflows/{id}                # Delete workflow
GET    /api/workflows/project/{projectId} # Get workflows for project
POST   /api/workflows/{id}/validate       # Validate transition

gRPC Services:

// protos/workflow.proto
syntax = "proto3";
package colaflow.workflow;

service WorkflowService {
  rpc GetWorkflowByProject (GetWorkflowByProjectRequest) returns (WorkflowResponse);
  rpc ValidateTransition (ValidateTransitionRequest) returns (ValidationResponse);
  rpc GetAvailableTransitions (GetAvailableTransitionsRequest) returns (TransitionsResponse);
}

message GetWorkflowByProjectRequest {
  string project_id = 1;
}

message WorkflowResponse {
  string id = 1;
  string name = 2;
  repeated WorkflowState states = 3;
}

message WorkflowState {
  string name = 1;
  string category = 2;
  repeated string allowed_transitions = 3;
}

message ValidateTransitionRequest {
  string workflow_id = 1;
  string current_state = 2;
  string target_state = 3;
}

message ValidationResponse {
  bool is_valid = 1;
  string message = 2;
}

Published Events:

public record WorkflowCreatedEvent(Guid WorkflowId, Guid ProjectId);
public record StateTransitionValidatedEvent(Guid WorkflowId, string FromState, string ToState);

2.3 User Service

Bounded Context: User Management & Authentication

Domain Model:

public class User : AggregateRoot
{
    public UserId Id { get; private set; }
    public Email Email { get; private set; }
    public string FirstName { get; private set; }
    public string LastName { get; private set; }
    public string PasswordHash { get; private set; }
    public UserRole Role { get; private set; }

    public static User Create(string email, string firstName, string lastName, string password);
    public void UpdateProfile(string firstName, string lastName);
    public void ChangePassword(string currentPassword, string newPassword);
}

public class Team : AggregateRoot
{
    public TeamId Id { get; private set; }
    public string Name { get; private set; }

    private readonly List<TeamMember> _members = new();
    public IReadOnlyCollection<TeamMember> Members => _members.AsReadOnly();

    public void AddMember(UserId userId, TeamRole role);
    public void RemoveMember(UserId userId);
}

API Endpoints:

POST   /api/auth/login                    # Login
POST   /api/auth/register                 # Register
POST   /api/auth/refresh                  # Refresh token
POST   /api/auth/logout                   # Logout

GET    /api/users                         # List users
GET    /api/users/{id}                    # Get user
PUT    /api/users/{id}                    # Update user
GET    /api/users/me                      # Get current user

GET    /api/teams                         # List teams
POST   /api/teams                         # Create team
GET    /api/teams/{id}                    # Get team
PUT    /api/teams/{id}                    # Update team
POST   /api/teams/{id}/members            # Add team member
DELETE /api/teams/{id}/members/{userId}   # Remove team member

gRPC Services:

// protos/user.proto
syntax = "proto3";
package colaflow.user;

service UserService {
  rpc GetUser (GetUserRequest) returns (UserResponse);
  rpc GetUsersByIds (GetUsersByIdsRequest) returns (UsersResponse);
  rpc ValidateToken (ValidateTokenRequest) returns (TokenValidationResponse);
  rpc GetUserPermissions (GetUserPermissionsRequest) returns (PermissionsResponse);
}

message GetUserRequest {
  string user_id = 1;
}

message UserResponse {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
  string role = 5;
}

message GetUsersByIdsRequest {
  repeated string user_ids = 1;
}

message UsersResponse {
  repeated UserResponse users = 1;
}

message ValidateTokenRequest {
  string token = 1;
}

message TokenValidationResponse {
  bool is_valid = 1;
  string user_id = 2;
  string role = 3;
}

Published Events:

public record UserRegisteredEvent(Guid UserId, string Email, string FullName);
public record UserProfileUpdatedEvent(Guid UserId, string FirstName, string LastName);
public record TeamCreatedEvent(Guid TeamId, string TeamName);
public record TeamMemberAddedEvent(Guid TeamId, Guid UserId, string Role);

2.4 Notification Service

Bounded Context: Notifications & Real-time Communication

Domain Model:

public class Notification : AggregateRoot
{
    public NotificationId Id { get; private set; }
    public Guid RecipientId { get; private set; }
    public string Title { get; private set; }
    public string Message { get; private set; }
    public NotificationType Type { get; private set; }
    public bool IsRead { get; private set; }
    public DateTime CreatedAt { get; private set; }

    public void MarkAsRead();
}

public class NotificationSubscription : Entity
{
    public Guid UserId { get; private set; }
    public NotificationChannel Channel { get; private set; } // Email, SignalR, Push
    public string Endpoint { get; private set; }
    public bool IsActive { get; private set; }
}

API Endpoints:

GET    /api/notifications                 # List notifications
POST   /api/notifications                 # Create notification (internal)
PATCH  /api/notifications/{id}/read       # Mark as read
DELETE /api/notifications/{id}            # Delete notification

GET    /api/subscriptions                 # List subscriptions
POST   /api/subscriptions                 # Create subscription
DELETE /api/subscriptions/{id}            # Delete subscription

SignalR Hub:

public class NotificationHub : Hub
{
    public async Task JoinProject(string projectId)
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, $"project_{projectId}");
    }

    public async Task LeaveProject(string projectId)
    {
        await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"project_{projectId}");
    }
}

// Server-side push
await _hubContext.Clients.Group($"project_{projectId}").SendAsync("TaskUpdated", taskDto);

Consumed Events:

// Listens to events from other services
public class TaskAssignedEventConsumer : IConsumer<TaskAssignedEvent>
{
    public async Task Consume(ConsumeContext<TaskAssignedEvent> context)
    {
        var notification = Notification.Create(
            context.Message.AssigneeId,
            "Task Assigned",
            $"You have been assigned to task: {context.Message.TaskId}"
        );

        await _notificationRepository.AddAsync(notification);
        await _hubContext.Clients.User(context.Message.AssigneeId.ToString())
            .SendAsync("NotificationReceived", notification);
    }
}

2.5 Audit Service

Bounded Context: Audit Logging & Event Store

Domain Model:

public class AuditLog : Entity
{
    public long Id { get; private set; }
    public string EntityType { get; private set; }
    public Guid EntityId { get; private set; }
    public string Action { get; private set; }
    public string Changes { get; private set; } // JSON
    public Guid UserId { get; private set; }
    public DateTime Timestamp { get; private set; }
    public string IpAddress { get; private set; }
}

public class DomainEventRecord : Entity
{
    public long Id { get; private set; }
    public string EventType { get; private set; }
    public Guid AggregateId { get; private set; }
    public string EventData { get; private set; } // JSON
    public DateTime OccurredOn { get; private set; }
    public DateTime? ProcessedOn { get; private set; }
}

API Endpoints:

GET    /api/audit-logs                           # List audit logs
GET    /api/audit-logs/{entityType}/{entityId}   # Get entity audit logs
POST   /api/audit-logs/{id}/rollback             # Rollback changes

GET    /api/events                               # List domain events
GET    /api/events/{aggregateId}                 # Get aggregate events

Consumed Events:

// Listens to ALL domain events from all services
public class UniversalEventConsumer : IConsumer<DomainEvent>
{
    public async Task Consume(ConsumeContext<DomainEvent> context)
    {
        var eventRecord = new DomainEventRecord
        {
            EventType = context.Message.GetType().Name,
            AggregateId = context.Message.AggregateId,
            EventData = JsonSerializer.Serialize(context.Message),
            OccurredOn = context.Message.OccurredOn
        };

        await _eventStoreRepository.AddAsync(eventRecord);
    }
}

2.6 AI Service (MCP Server)

Bounded Context: AI Integration & MCP Protocol

Domain Model:

public class AITask : AggregateRoot
{
    public AITaskId Id { get; private set; }
    public string Prompt { get; private set; }
    public string Response { get; private set; }
    public AITaskStatus Status { get; private set; }
    public Guid CreatedBy { get; private set; }
    public DateTime CreatedAt { get; private set; }

    public void Complete(string response);
    public void Fail(string errorMessage);
}

public class MCPResource : Entity
{
    public string ResourceId { get; private set; }
    public string Type { get; private set; } // projects.search, issues.search
    public string Schema { get; private set; } // JSON Schema
}

API Endpoints:

POST   /api/ai/tasks                      # Create AI task
GET    /api/ai/tasks/{id}                 # Get AI task
GET    /api/ai/tasks                      # List AI tasks

GET    /api/mcp/resources                 # List MCP resources
GET    /api/mcp/resources/{resourceId}    # Get resource data
POST   /api/mcp/tools/{toolName}          # Execute MCP tool
GET    /api/mcp/tools                     # List MCP tools

MCP Resources:

{
  "resources": [
    {
      "uri": "colaflow://projects.search",
      "name": "Search Projects",
      "description": "Search and list projects",
      "mimeType": "application/json"
    },
    {
      "uri": "colaflow://issues.search",
      "name": "Search Issues",
      "description": "Search tasks and issues",
      "mimeType": "application/json"
    }
  ]
}

MCP Tools:

{
  "tools": [
    {
      "name": "create_task",
      "description": "Create a new task",
      "inputSchema": {
        "type": "object",
        "properties": {
          "title": { "type": "string" },
          "description": { "type": "string" },
          "priority": { "type": "string", "enum": ["Low", "Medium", "High", "Urgent"] }
        },
        "required": ["title"]
      }
    },
    {
      "name": "update_task_status",
      "description": "Update task status with diff preview",
      "inputSchema": {
        "type": "object",
        "properties": {
          "task_id": { "type": "string" },
          "new_status": { "type": "string" }
        },
        "required": ["task_id", "new_status"]
      }
    }
  ]
}

3. Service Communication Patterns

3.1 Synchronous Communication (gRPC)

When to use gRPC:

  • Real-time queries (e.g., "Get User by ID")
  • Validation requests (e.g., "Check if project exists")
  • Low-latency requirements

Example: Project Service → User Service

// Project Service - gRPC Client
public class UserServiceClient
{
    private readonly UserService.UserServiceClient _grpcClient;

    public UserServiceClient(UserService.UserServiceClient grpcClient)
    {
        _grpcClient = grpcClient;
    }

    public async Task<UserDto> GetUserAsync(Guid userId)
    {
        var request = new GetUserRequest { UserId = userId.ToString() };
        var response = await _grpcClient.GetUserAsync(request);

        return new UserDto
        {
            Id = Guid.Parse(response.Id),
            Email = response.Email,
            FirstName = response.FirstName,
            LastName = response.LastName
        };
    }
}

// Used in Command Handler
public class AssignTaskCommandHandler : IRequestHandler<AssignTaskCommand>
{
    private readonly ITaskRepository _taskRepository;
    private readonly UserServiceClient _userServiceClient;

    public async Task<TaskDto> Handle(AssignTaskCommand request, CancellationToken ct)
    {
        // Validate user exists via gRPC
        var user = await _userServiceClient.GetUserAsync(request.AssigneeId);
        if (user == null)
            throw new NotFoundException("User not found");

        // Assign task
        var task = await _taskRepository.GetByIdAsync(request.TaskId);
        task.AssignTo(request.AssigneeId);

        await _unitOfWork.CommitAsync(ct);
        return _mapper.Map<TaskDto>(task);
    }
}

gRPC Client Registration:

// Program.cs
builder.Services.AddGrpcClient<UserService.UserServiceClient>(options =>
{
    options.Address = new Uri("https://user-service:5003");
})
.ConfigurePrimaryHttpMessageHandler(() =>
{
    return new HttpClientHandler
    {
        ServerCertificateCustomValidationCallback =
            HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
    };
});

3.2 Asynchronous Communication (RabbitMQ + MassTransit)

When to use Async Messaging:

  • Event notifications (e.g., "Task Created")
  • Cross-service workflows (e.g., Saga orchestration)
  • Decoupled communication

Example: Task Created Event Flow

// Project Service - Publisher
public class CreateTaskCommandHandler : IRequestHandler<CreateTaskCommand>
{
    private readonly IPublishEndpoint _publishEndpoint;

    public async Task<TaskDto> Handle(CreateTaskCommand request, CancellationToken ct)
    {
        // Create task
        var task = Task.Create(request.Title, request.Description);
        await _taskRepository.AddAsync(task, ct);
        await _unitOfWork.CommitAsync(ct);

        // Publish event
        await _publishEndpoint.Publish(new TaskCreatedEvent
        {
            TaskId = task.Id,
            Title = task.Title,
            AssigneeId = task.AssigneeId,
            ProjectId = task.ProjectId
        }, ct);

        return _mapper.Map<TaskDto>(task);
    }
}

// Notification Service - Consumer
public class TaskCreatedEventConsumer : IConsumer<TaskCreatedEvent>
{
    private readonly INotificationRepository _notificationRepository;
    private readonly IHubContext<NotificationHub> _hubContext;

    public async Task Consume(ConsumeContext<TaskCreatedEvent> context)
    {
        var evt = context.Message;

        // Create notification
        var notification = Notification.Create(
            evt.AssigneeId,
            "New Task Assigned",
            $"You have been assigned to task: {evt.Title}"
        );

        await _notificationRepository.AddAsync(notification);

        // Send SignalR notification
        await _hubContext.Clients.User(evt.AssigneeId.ToString())
            .SendAsync("TaskCreated", new { evt.TaskId, evt.Title });
    }
}

// Audit Service - Consumer
public class TaskCreatedEventConsumer : IConsumer<TaskCreatedEvent>
{
    private readonly IEventStoreRepository _eventStoreRepository;

    public async Task Consume(ConsumeContext<TaskCreatedEvent> context)
    {
        // Store event in event store
        var eventRecord = new DomainEventRecord
        {
            EventType = nameof(TaskCreatedEvent),
            AggregateId = context.Message.TaskId,
            EventData = JsonSerializer.Serialize(context.Message),
            OccurredOn = DateTime.UtcNow
        };

        await _eventStoreRepository.AddAsync(eventRecord);
    }
}

MassTransit Configuration:

// Program.cs
builder.Services.AddMassTransit(config =>
{
    // Register consumers
    config.AddConsumer<TaskCreatedEventConsumer>();
    config.AddConsumer<TaskAssignedEventConsumer>();

    config.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq://rabbitmq:5672", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        // Configure endpoints
        cfg.ReceiveEndpoint("notification-service", e =>
        {
            e.ConfigureConsumer<TaskCreatedEventConsumer>(context);
        });
    });
});

4. Distributed Transactions - Saga Pattern

4.1 Saga Orchestration with MassTransit

Use Case: Create Project with Default Workflow

Requirements:

  1. Project Service: Create project
  2. Workflow Service: Create default workflow
  3. Notification Service: Send notification
  4. If any step fails → compensate (rollback)

Saga State Machine:

// Saga State
public class CreateProjectSagaState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    // Saga data
    public Guid ProjectId { get; set; }
    public string ProjectName { get; set; }
    public Guid OwnerId { get; set; }
    public Guid? WorkflowId { get; set; }

    // Timestamps
    public DateTime CreatedAt { get; set; }
    public DateTime? CompletedAt { get; set; }
}

// Saga Definition
public class CreateProjectSaga : MassTransitStateMachine<CreateProjectSagaState>
{
    public State CreatingProject { get; private set; }
    public State CreatingWorkflow { get; private set; }
    public State SendingNotification { get; private set; }
    public State Completed { get; private set; }
    public State Failed { get; private set; }

    // Events
    public Event<CreateProjectCommand> CreateProject { get; private set; }
    public Event<ProjectCreated> ProjectCreated { get; private set; }
    public Event<CreateWorkflowCommand> CreateWorkflow { get; private set; }
    public Event<WorkflowCreated> WorkflowCreated { get; private set; }
    public Event<ProjectCreationFailed> ProjectFailed { get; private set; }
    public Event<WorkflowCreationFailed> WorkflowFailed { get; private set; }

    public CreateProjectSaga()
    {
        InstanceState(x => x.CurrentState);

        // Step 1: Create Project
        Initially(
            When(CreateProject)
                .Then(context =>
                {
                    context.Saga.ProjectName = context.Message.Name;
                    context.Saga.OwnerId = context.Message.OwnerId;
                    context.Saga.CreatedAt = DateTime.UtcNow;
                })
                .TransitionTo(CreatingProject)
                .Publish(context => new CreateProjectInternalCommand
                {
                    CorrelationId = context.Saga.CorrelationId,
                    Name = context.Message.Name,
                    Description = context.Message.Description,
                    Key = context.Message.Key,
                    OwnerId = context.Message.OwnerId
                })
        );

        // Step 2: Project Created → Create Workflow
        During(CreatingProject,
            When(ProjectCreated)
                .Then(context =>
                {
                    context.Saga.ProjectId = context.Message.ProjectId;
                })
                .TransitionTo(CreatingWorkflow)
                .PublishAsync(context => context.Init<CreateWorkflowCommand>(new
                {
                    CorrelationId = context.Saga.CorrelationId,
                    ProjectId = context.Message.ProjectId,
                    Name = $"{context.Saga.ProjectName} Workflow"
                }))
        );

        // Step 3: Workflow Created → Send Notification
        During(CreatingWorkflow,
            When(WorkflowCreated)
                .Then(context =>
                {
                    context.Saga.WorkflowId = context.Message.WorkflowId;
                })
                .TransitionTo(SendingNotification)
                .PublishAsync(context => context.Init<SendNotificationCommand>(new
                {
                    RecipientId = context.Saga.OwnerId,
                    Title = "Project Created",
                    Message = $"Project '{context.Saga.ProjectName}' has been created successfully."
                }))
        );

        // Step 4: Notification Sent → Complete
        During(SendingNotification,
            When(NotificationSent)
                .Then(context =>
                {
                    context.Saga.CompletedAt = DateTime.UtcNow;
                })
                .TransitionTo(Completed)
                .Finalize()
        );

        // Compensation: Project Creation Failed
        During(CreatingProject,
            When(ProjectFailed)
                .Then(context =>
                {
                    // Log failure
                    Console.WriteLine($"Project creation failed: {context.Message.Reason}");
                })
                .TransitionTo(Failed)
                .Finalize()
        );

        // Compensation: Workflow Creation Failed → Delete Project
        During(CreatingWorkflow,
            When(WorkflowFailed)
                .Then(context =>
                {
                    Console.WriteLine($"Workflow creation failed: {context.Message.Reason}");
                })
                .PublishAsync(context => context.Init<DeleteProjectCommand>(new
                {
                    ProjectId = context.Saga.ProjectId,
                    Reason = "Workflow creation failed"
                }))
                .TransitionTo(Failed)
                .Finalize()
        );

        SetCompletedWhenFinalized();
    }
}

// Saga Registration
builder.Services.AddMassTransit(config =>
{
    config.AddSagaStateMachine<CreateProjectSaga, CreateProjectSagaState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
            r.AddDbContext<DbContext, SagaDbContext>((provider, builder) =>
            {
                builder.UseNpgsql(connectionString);
            });
        });

    config.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq://rabbitmq:5672");
        cfg.ConfigureEndpoints(context);
    });
});

Saga Database Table:

CREATE TABLE create_project_saga_state (
    correlation_id UUID PRIMARY KEY,
    current_state VARCHAR(100) NOT NULL,
    project_id UUID,
    project_name VARCHAR(200),
    owner_id UUID,
    workflow_id UUID,
    created_at TIMESTAMP NOT NULL,
    completed_at TIMESTAMP
);

4.2 Outbox Pattern (Reliable Messaging)

Problem: Ensure domain events are published even if RabbitMQ is down.

Solution: Store events in database, then publish asynchronously.

// Outbox Message Entity
public class OutboxMessage
{
    public Guid Id { get; set; }
    public string Type { get; set; }
    public string Content { get; set; } // JSON
    public DateTime OccurredOn { get; set; }
    public DateTime? ProcessedOn { get; set; }
    public string Error { get; set; }
}

// Save to Outbox in same transaction
public async Task<int> CommitAsync(CancellationToken cancellationToken = default)
{
    var domainEvents = ChangeTracker
        .Entries<AggregateRoot>()
        .SelectMany(x => x.Entity.DomainEvents)
        .ToList();

    // Store events in outbox
    foreach (var domainEvent in domainEvents)
    {
        var outboxMessage = new OutboxMessage
        {
            Id = Guid.NewGuid(),
            Type = domainEvent.GetType().Name,
            Content = JsonSerializer.Serialize(domainEvent),
            OccurredOn = DateTime.UtcNow
        };

        OutboxMessages.Add(outboxMessage);
    }

    // Save changes (domain entities + outbox messages in same transaction)
    var result = await base.SaveChangesAsync(cancellationToken);

    return result;
}

// Background Service: Process Outbox
public class OutboxProcessor : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            using var scope = _serviceProvider.CreateScope();
            var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
            var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();

            // Get unprocessed messages
            var messages = await dbContext.OutboxMessages
                .Where(m => m.ProcessedOn == null)
                .OrderBy(m => m.OccurredOn)
                .Take(100)
                .ToListAsync(stoppingToken);

            foreach (var message in messages)
            {
                try
                {
                    // Deserialize and publish
                    var eventType = Type.GetType(message.Type);
                    var domainEvent = JsonSerializer.Deserialize(message.Content, eventType);

                    await publishEndpoint.Publish(domainEvent, stoppingToken);

                    // Mark as processed
                    message.ProcessedOn = DateTime.UtcNow;
                }
                catch (Exception ex)
                {
                    message.Error = ex.Message;
                }
            }

            await dbContext.SaveChangesAsync(stoppingToken);

            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
        }
    }
}

5. API Gateway (YARP)

5.1 YARP Configuration

Why YARP:

  • Native .NET 9 support
  • High performance reverse proxy
  • Dynamic configuration
  • Built-in load balancing
  • Request/response transformation

appsettings.json:

{
  "ReverseProxy": {
    "Routes": {
      "project-route": {
        "ClusterId": "project-cluster",
        "AuthorizationPolicy": "authenticated",
        "Match": {
          "Path": "/api/projects/{**catch-all}"
        },
        "Transforms": [
          {
            "RequestHeader": "X-Forwarded-For",
            "Append": "{RemoteIpAddress}"
          }
        ]
      },
      "workflow-route": {
        "ClusterId": "workflow-cluster",
        "AuthorizationPolicy": "authenticated",
        "Match": {
          "Path": "/api/workflows/{**catch-all}"
        }
      },
      "user-route": {
        "ClusterId": "user-cluster",
        "Match": {
          "Path": "/api/users/{**catch-all}"
        }
      },
      "auth-route": {
        "ClusterId": "user-cluster",
        "Match": {
          "Path": "/api/auth/{**catch-all}"
        }
      },
      "notification-route": {
        "ClusterId": "notification-cluster",
        "AuthorizationPolicy": "authenticated",
        "Match": {
          "Path": "/api/notifications/{**catch-all}"
        }
      },
      "audit-route": {
        "ClusterId": "audit-cluster",
        "AuthorizationPolicy": "admin",
        "Match": {
          "Path": "/api/audit-logs/{**catch-all}"
        }
      },
      "ai-route": {
        "ClusterId": "ai-cluster",
        "AuthorizationPolicy": "authenticated",
        "Match": {
          "Path": "/api/ai/{**catch-all}"
        }
      },
      "mcp-route": {
        "ClusterId": "ai-cluster",
        "AuthorizationPolicy": "mcp-client",
        "Match": {
          "Path": "/api/mcp/{**catch-all}"
        }
      }
    },
    "Clusters": {
      "project-cluster": {
        "LoadBalancingPolicy": "RoundRobin",
        "Destinations": {
          "project-service-1": {
            "Address": "http://project-service:5001"
          }
        }
      },
      "workflow-cluster": {
        "LoadBalancingPolicy": "RoundRobin",
        "Destinations": {
          "workflow-service-1": {
            "Address": "http://workflow-service:5002"
          }
        }
      },
      "user-cluster": {
        "LoadBalancingPolicy": "RoundRobin",
        "Destinations": {
          "user-service-1": {
            "Address": "http://user-service:5003"
          }
        }
      },
      "notification-cluster": {
        "LoadBalancingPolicy": "RoundRobin",
        "Destinations": {
          "notification-service-1": {
            "Address": "http://notification-service:5004"
          }
        }
      },
      "audit-cluster": {
        "LoadBalancingPolicy": "RoundRobin",
        "Destinations": {
          "audit-service-1": {
            "Address": "http://audit-service:5005"
          }
        }
      },
      "ai-cluster": {
        "LoadBalancingPolicy": "RoundRobin",
        "Destinations": {
          "ai-service-1": {
            "Address": "http://ai-service:5006"
          }
        }
      }
    }
  }
}

5.2 API Gateway Code

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// Add YARP
builder.Services.AddReverseProxy()
    .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));

// Add Authentication
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
    .AddJwtBearer(options =>
    {
        options.Authority = "http://user-service:5003";
        options.TokenValidationParameters = new TokenValidationParameters
        {
            ValidateIssuer = true,
            ValidateAudience = true,
            ValidateLifetime = true,
            ValidIssuer = "ColaFlow",
            ValidAudience = "ColaFlow"
        };
    });

// Add Authorization Policies
builder.Services.AddAuthorization(options =>
{
    options.AddPolicy("authenticated", policy => policy.RequireAuthenticatedUser());
    options.AddPolicy("admin", policy => policy.RequireRole("Admin"));
    options.AddPolicy("mcp-client", policy => policy.RequireClaim("client_type", "mcp"));
});

// Add Rate Limiting
builder.Services.AddRateLimiter(options =>
{
    options.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(context =>
    {
        return RateLimitPartition.GetFixedWindowLimiter(
            partitionKey: context.User.Identity?.Name ?? context.Request.Headers.Host.ToString(),
            factory: partition => new FixedWindowRateLimiterOptions
            {
                AutoReplenishment = true,
                PermitLimit = 100,
                QueueLimit = 0,
                Window = TimeSpan.FromMinutes(1)
            });
    });
});

// Add Distributed Tracing
builder.Services.AddOpenTelemetry()
    .WithTracing(tracing => tracing
        .AddAspNetCoreInstrumentation()
        .AddHttpClientInstrumentation()
        .AddJaegerExporter(options =>
        {
            options.AgentHost = "jaeger";
            options.AgentPort = 6831;
        }));

var app = builder.Build();

app.UseRateLimiter();
app.UseAuthentication();
app.UseAuthorization();

// Map YARP
app.MapReverseProxy();

app.Run();

6. Service Discovery (Consul)

6.1 Consul Configuration

Why Consul:

  • Service registry and health checking
  • Dynamic service discovery
  • Key/value store for configuration
  • Production-ready and battle-tested

Service Registration:

// Program.cs - Each Service
builder.Services.AddConsul(builder.Configuration);

public static class ConsulExtensions
{
    public static IServiceCollection AddConsul(this IServiceCollection services, IConfiguration configuration)
    {
        var consulConfig = configuration.GetSection("Consul").Get<ConsulConfig>();

        services.AddSingleton<IConsulClient>(sp => new ConsulClient(config =>
        {
            config.Address = new Uri(consulConfig.Address);
        }));

        services.AddHostedService<ConsulHostedService>();

        return services;
    }
}

public class ConsulHostedService : IHostedService
{
    private readonly IConsulClient _consulClient;
    private readonly IConfiguration _configuration;
    private string _registrationId;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var serviceConfig = _configuration.GetSection("Service").Get<ServiceConfig>();

        _registrationId = $"{serviceConfig.Name}-{serviceConfig.Id}";

        var registration = new AgentServiceRegistration
        {
            ID = _registrationId,
            Name = serviceConfig.Name,
            Address = serviceConfig.Address,
            Port = serviceConfig.Port,
            Tags = new[] { "colaflow", serviceConfig.Name },
            Check = new AgentServiceCheck
            {
                HTTP = $"http://{serviceConfig.Address}:{serviceConfig.Port}/health",
                Interval = TimeSpan.FromSeconds(10),
                Timeout = TimeSpan.FromSeconds(5),
                DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1)
            }
        };

        await _consulClient.Agent.ServiceRegister(registration, cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _consulClient.Agent.ServiceDeregister(_registrationId, cancellationToken);
    }
}

appsettings.json:

{
  "Consul": {
    "Address": "http://consul:8500"
  },
  "Service": {
    "Name": "project-service",
    "Id": "project-service-1",
    "Address": "project-service",
    "Port": 5001
  }
}

Service Discovery Client:

public class ServiceDiscoveryClient
{
    private readonly IConsulClient _consulClient;

    public async Task<string> GetServiceAddressAsync(string serviceName)
    {
        var services = await _consulClient.Health.Service(serviceName, "", true);

        if (!services.Response.Any())
            throw new Exception($"Service '{serviceName}' not found");

        var service = services.Response.First();
        return $"http://{service.Service.Address}:{service.Service.Port}";
    }
}

// Usage
var userServiceAddress = await _serviceDiscovery.GetServiceAddressAsync("user-service");
var grpcClient = new UserService.UserServiceClient(
    GrpcChannel.ForAddress(userServiceAddress)
);

7. Distributed Tracing (OpenTelemetry + Jaeger)

7.1 OpenTelemetry Configuration

// Program.cs - Each Service
builder.Services.AddOpenTelemetry()
    .WithTracing(tracing =>
    {
        tracing
            .AddAspNetCoreInstrumentation(options =>
            {
                options.RecordException = true;
                options.Filter = (httpContext) =>
                    !httpContext.Request.Path.Value.Contains("health");
            })
            .AddHttpClientInstrumentation()
            .AddGrpcClientInstrumentation()
            .AddEntityFrameworkCoreInstrumentation(options =>
            {
                options.SetDbStatementForText = true;
            })
            .AddSource("MassTransit")
            .SetResourceBuilder(ResourceBuilder.CreateDefault()
                .AddService("project-service")
                .AddAttributes(new Dictionary<string, object>
                {
                    ["environment"] = "production",
                    ["version"] = "1.0.0"
                }))
            .AddJaegerExporter(options =>
            {
                options.AgentHost = "jaeger";
                options.AgentPort = 6831;
            });
    })
    .WithMetrics(metrics =>
    {
        metrics
            .AddAspNetCoreInstrumentation()
            .AddHttpClientInstrumentation()
            .AddRuntimeInstrumentation()
            .AddPrometheusExporter();
    });

7.2 Custom Instrumentation

public class CreateTaskCommandHandler : IRequestHandler<CreateTaskCommand>
{
    private readonly ActivitySource _activitySource = new("ColaFlow.ProjectService");

    public async Task<TaskDto> Handle(CreateTaskCommand request, CancellationToken ct)
    {
        using var activity = _activitySource.StartActivity("CreateTask", ActivityKind.Server);
        activity?.SetTag("task.title", request.Title);
        activity?.SetTag("task.priority", request.Priority);

        try
        {
            // Business logic
            var task = Task.Create(request.Title, request.Description);
            await _taskRepository.AddAsync(task, ct);

            activity?.SetTag("task.id", task.Id);
            activity?.SetStatus(ActivityStatusCode.Ok);

            return _mapper.Map<TaskDto>(task);
        }
        catch (Exception ex)
        {
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
            activity?.RecordException(ex);
            throw;
        }
    }
}

8. Project Structure

8.1 Overall Structure

product-master/
├── services/
│   ├── project-service/
│   │   ├── src/
│   │   │   ├── ColaFlow.ProjectService.Domain/
│   │   │   ├── ColaFlow.ProjectService.Application/
│   │   │   ├── ColaFlow.ProjectService.Infrastructure/
│   │   │   └── ColaFlow.ProjectService.API/
│   │   ├── tests/
│   │   ├── Dockerfile
│   │   └── ColaFlow.ProjectService.sln
│   ├── workflow-service/
│   │   ├── src/
│   │   ├── tests/
│   │   ├── Dockerfile
│   │   └── ColaFlow.WorkflowService.sln
│   ├── user-service/
│   ├── notification-service/
│   ├── audit-service/
│   └── ai-service/
├── gateway/
│   └── api-gateway/
│       ├── src/
│       │   └── ColaFlow.ApiGateway/
│       │       ├── Program.cs
│       │       ├── appsettings.json
│       │       └── Middleware/
│       ├── Dockerfile
│       └── ColaFlow.ApiGateway.sln
├── shared/
│   ├── ColaFlow.Shared.Contracts/
│   │   ├── DTOs/
│   │   ├── Events/
│   │   └── Protos/
│   │       ├── project.proto
│   │       ├── workflow.proto
│   │       └── user.proto
│   ├── ColaFlow.Shared.Messaging/
│   │   ├── MassTransit/
│   │   └── EventBus/
│   └── ColaFlow.Shared.Common/
│       ├── Extensions/
│       ├── Utilities/
│       └── Constants/
├── infrastructure/
│   ├── docker/
│   │   ├── docker-compose.microservices.yml
│   │   ├── docker-compose.infrastructure.yml
│   │   └── .env
│   ├── k8s/
│   │   ├── namespaces/
│   │   │   └── colaflow-namespace.yaml
│   │   ├── services/
│   │   │   ├── project-service.yaml
│   │   │   ├── workflow-service.yaml
│   │   │   ├── user-service.yaml
│   │   │   ├── notification-service.yaml
│   │   │   ├── audit-service.yaml
│   │   │   ├── ai-service.yaml
│   │   │   └── api-gateway.yaml
│   │   ├── deployments/
│   │   │   ├── project-service-deployment.yaml
│   │   │   ├── workflow-service-deployment.yaml
│   │   │   └── ... (one per service)
│   │   ├── configmaps/
│   │   │   └── appsettings-configmap.yaml
│   │   ├── secrets/
│   │   │   └── database-secrets.yaml
│   │   ├── ingress/
│   │   │   └── ingress.yaml
│   │   └── infrastructure/
│   │       ├── postgres-statefulset.yaml
│   │       ├── rabbitmq-deployment.yaml
│   │       ├── redis-deployment.yaml
│   │       ├── consul-deployment.yaml
│   │       └── jaeger-deployment.yaml
│   └── helm/
│       └── colaflow/
│           ├── Chart.yaml
│           ├── values.yaml
│           ├── values-dev.yaml
│           ├── values-prod.yaml
│           └── templates/
│               ├── services/
│               ├── deployments/
│               ├── configmaps/
│               ├── secrets/
│               └── ingress/
├── colaflow-web/
├── scripts/
│   ├── build-all.sh
│   ├── deploy-k8s.sh
│   └── generate-protos.sh
└── docs/
    ├── Microservices-Architecture.md
    ├── Service-Development-Guide.md
    └── Operational-Runbook.md

8.2 Service Structure (Example: Project Service)

project-service/
├── src/
│   ├── ColaFlow.ProjectService.Domain/
│   │   ├── Aggregates/
│   │   │   └── ProjectAggregate/
│   │   │       ├── Project.cs
│   │   │       ├── Epic.cs
│   │   │       ├── Story.cs
│   │   │       └── Task.cs
│   │   ├── ValueObjects/
│   │   ├── Events/
│   │   ├── Interfaces/
│   │   └── Exceptions/
│   ├── ColaFlow.ProjectService.Application/
│   │   ├── Commands/
│   │   │   ├── CreateProject/
│   │   │   ├── UpdateProject/
│   │   │   ├── CreateTask/
│   │   │   └── UpdateTaskStatus/
│   │   ├── Queries/
│   │   │   ├── GetProject/
│   │   │   ├── GetKanbanBoard/
│   │   │   └── SearchTasks/
│   │   ├── DTOs/
│   │   ├── Mappings/
│   │   └── Services/
│   │       └── Clients/
│   │           ├── UserServiceClient.cs
│   │           └── WorkflowServiceClient.cs
│   ├── ColaFlow.ProjectService.Infrastructure/
│   │   ├── Persistence/
│   │   │   ├── ProjectDbContext.cs
│   │   │   ├── Repositories/
│   │   │   └── Configurations/
│   │   ├── Messaging/
│   │   │   ├── EventPublisher.cs
│   │   │   └── Consumers/
│   │   ├── gRPC/
│   │   │   └── ProjectGrpcService.cs
│   │   └── Caching/
│   └── ColaFlow.ProjectService.API/
│       ├── Controllers/
│       │   ├── ProjectsController.cs
│       │   ├── EpicsController.cs
│       │   ├── StoriesController.cs
│       │   └── TasksController.cs
│       ├── Middleware/
│       ├── Program.cs
│       ├── appsettings.json
│       └── Dockerfile
├── tests/
│   ├── Domain.Tests/
│   ├── Application.Tests/
│   └── Integration.Tests/
└── ColaFlow.ProjectService.sln

9. Code Examples

9.1 gRPC Service Implementation

protos/project.proto:

syntax = "proto3";

package colaflow.project;

service ProjectService {
  rpc GetProject (GetProjectRequest) returns (ProjectResponse);
  rpc GetTasksByAssignee (GetTasksByAssigneeRequest) returns (TaskListResponse);
  rpc ValidateProjectExists (ValidateProjectRequest) returns (ValidationResponse);
}

message GetProjectRequest {
  string project_id = 1;
}

message ProjectResponse {
  string id = 1;
  string name = 2;
  string key = 3;
  string status = 4;
}

message GetTasksByAssigneeRequest {
  string assignee_id = 1;
  int32 page = 2;
  int32 page_size = 3;
}

message TaskListResponse {
  repeated TaskDto tasks = 1;
  int32 total_count = 2;
}

message TaskDto {
  string id = 1;
  string title = 2;
  string status = 3;
  string priority = 4;
}

message ValidateProjectRequest {
  string project_id = 1;
}

message ValidationResponse {
  bool exists = 1;
  string message = 2;
}

Server Implementation:

// ColaFlow.ProjectService.Infrastructure/gRPC/ProjectGrpcService.cs
public class ProjectGrpcService : ProjectService.ProjectServiceBase
{
    private readonly IMediator _mediator;
    private readonly IMapper _mapper;

    public ProjectGrpcService(IMediator mediator, IMapper mapper)
    {
        _mediator = mediator;
        _mapper = mapper;
    }

    public override async Task<ProjectResponse> GetProject(
        GetProjectRequest request,
        ServerCallContext context)
    {
        var query = new GetProjectByIdQuery(Guid.Parse(request.ProjectId));
        var project = await _mediator.Send(query);

        return new ProjectResponse
        {
            Id = project.Id.ToString(),
            Name = project.Name,
            Key = project.Key,
            Status = project.Status.ToString()
        };
    }

    public override async Task<TaskListResponse> GetTasksByAssignee(
        GetTasksByAssigneeRequest request,
        ServerCallContext context)
    {
        var query = new GetTasksByAssigneeQuery(
            Guid.Parse(request.AssigneeId),
            request.Page,
            request.PageSize
        );

        var result = await _mediator.Send(query);

        var response = new TaskListResponse
        {
            TotalCount = result.TotalCount
        };

        response.Tasks.AddRange(result.Items.Select(task => new TaskDto
        {
            Id = task.Id.ToString(),
            Title = task.Title,
            Status = task.Status.ToString(),
            Priority = task.Priority.ToString()
        }));

        return response;
    }

    public override async Task<ValidationResponse> ValidateProjectExists(
        ValidateProjectRequest request,
        ServerCallContext context)
    {
        try
        {
            var query = new GetProjectByIdQuery(Guid.Parse(request.ProjectId));
            var project = await _mediator.Send(query);

            return new ValidationResponse
            {
                Exists = true,
                Message = "Project exists"
            };
        }
        catch (NotFoundException)
        {
            return new ValidationResponse
            {
                Exists = false,
                Message = "Project not found"
            };
        }
    }
}

// Program.cs
builder.Services.AddGrpc();

app.MapGrpcService<ProjectGrpcService>();

Client Usage:

// Workflow Service - gRPC Client
public class ProjectServiceClient
{
    private readonly ProjectService.ProjectServiceClient _grpcClient;

    public ProjectServiceClient(ProjectService.ProjectServiceClient grpcClient)
    {
        _grpcClient = grpcClient;
    }

    public async Task<bool> ValidateProjectExistsAsync(Guid projectId)
    {
        var request = new ValidateProjectRequest
        {
            ProjectId = projectId.ToString()
        };

        var response = await _grpcClient.ValidateProjectExistsAsync(request);
        return response.Exists;
    }
}

// Program.cs - Workflow Service
builder.Services.AddGrpcClient<ProjectService.ProjectServiceClient>(options =>
{
    var projectServiceAddress = await serviceDiscovery.GetServiceAddressAsync("project-service");
    options.Address = new Uri(projectServiceAddress);
})
.ConfigurePrimaryHttpMessageHandler(() =>
{
    return new SocketsHttpHandler
    {
        PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
        KeepAlivePingDelay = TimeSpan.FromSeconds(60),
        KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
        EnableMultipleHttp2Connections = true
    };
});

9.2 Saga Pattern Example (Complete)

See Section 4.1 for complete Saga implementation.

9.3 API Gateway Middleware

// Correlation ID Middleware
public class CorrelationIdMiddleware
{
    private readonly RequestDelegate _next;

    public async Task InvokeAsync(HttpContext context)
    {
        var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault()
            ?? Guid.NewGuid().ToString();

        context.Request.Headers["X-Correlation-ID"] = correlationId;
        context.Response.Headers["X-Correlation-ID"] = correlationId;

        // Add to activity for distributed tracing
        Activity.Current?.SetTag("correlation_id", correlationId);

        await _next(context);
    }
}

// Usage
app.UseMiddleware<CorrelationIdMiddleware>();

10. Docker Compose (Local Development)

I'll create the Docker Compose configuration file next.


Status: Document creation in progress. Will continue with Docker Compose, Kubernetes, Helm Charts, and Migration Plan next.