Files
ColaFlow/docs/Microservices-Architecture.md
Yaojia Wang 014d62bcc2 Project Init
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-02 23:55:18 +01:00

2021 lines
58 KiB
Markdown

# ColaFlow Microservices Architecture Design
**Version:** 1.0
**Date:** 2025-11-02
**Status:** Production-Ready Design
**Author:** Architecture Team
**User Decision:** Adopt Microservices Architecture (with full awareness of costs and risks)
---
## Executive Summary
This document presents a **production-grade microservices architecture** for ColaFlow, as explicitly requested by the user. While the architecture team has previously recommended a Modular Monolith approach (see `Modular-Monolith-Architecture.md`), this document respects the user's strategic decision to adopt microservices from the start.
### Key Architectural Decisions
| Decision | Technology | Rationale |
|----------|-----------|-----------|
| **Service Communication (Sync)** | gRPC | High performance, strong contracts, native .NET support |
| **Service Communication (Async)** | RabbitMQ + MassTransit | Reliable messaging, event-driven architecture |
| **API Gateway** | YARP (.NET 9) | Native .NET, high performance, reverse proxy |
| **Service Discovery** | Consul / Kubernetes DNS | Production-ready, automatic service registration |
| **Distributed Tracing** | OpenTelemetry + Jaeger | Vendor-neutral, comprehensive observability |
| **Distributed Transactions** | Saga Pattern (MassTransit) | Orchestration-based, reliable compensation |
| **Configuration Management** | Consul / Azure App Config | Centralized, dynamic configuration |
| **Container Orchestration** | Kubernetes + Helm | Industry standard, mature ecosystem |
| **Message Format** | Protocol Buffers (gRPC) + JSON (REST) | Type-safe, efficient serialization |
| **Database per Service** | PostgreSQL (per service) | Data isolation, independent scaling |
### Cost and Risk Acknowledgment
**Development Timeline Impact:**
- Modular Monolith: 8-10 weeks to M1
- **Microservices: 16-20 weeks to M1** (+8-12 weeks)
**Team Skill Requirements:**
- Distributed systems expertise required
- DevOps maturity critical
- Kubernetes operational knowledge
- Distributed transaction patterns (Saga)
**Operational Complexity:**
- 6+ microservices to manage
- 6+ databases to maintain
- API Gateway, Service Mesh, Message Queue
- Distributed tracing and monitoring infrastructure
**Infrastructure Cost Increase:**
- Modular Monolith: ~$500/month
- **Microservices: ~$3,000-5,000/month** (6-10x increase)
---
## 1. Microservices Architecture Overview
### 1.1 System Architecture Diagram
```mermaid
graph TB
subgraph "Client Layer"
WebUI[Web Browser<br/>Next.js 15]
MobileApp[Mobile App<br/>Future]
AITools[AI Tools<br/>ChatGPT/Claude]
end
subgraph "API Gateway Layer"
YARP[YARP API Gateway<br/>.NET 9]
end
subgraph "Service Layer"
ProjectSvc[Project Service<br/>Projects/Epics/Stories/Tasks]
WorkflowSvc[Workflow Service<br/>Workflow Engine]
UserSvc[User Service<br/>Auth & Users]
NotifSvc[Notification Service<br/>SignalR/Email]
AuditSvc[Audit Service<br/>Event Store]
AISvc[AI Service<br/>MCP Server]
end
subgraph "Infrastructure Layer"
RabbitMQ[RabbitMQ<br/>Message Bus]
Redis[Redis<br/>Cache/Session]
Consul[Consul<br/>Service Discovery]
Jaeger[Jaeger<br/>Distributed Tracing]
end
subgraph "Data Layer"
DB1[(PostgreSQL 1<br/>Projects)]
DB2[(PostgreSQL 2<br/>Workflows)]
DB3[(PostgreSQL 3<br/>Users)]
DB4[(PostgreSQL 4<br/>Notifications)]
DB5[(PostgreSQL 5<br/>Audit)]
DB6[(PostgreSQL 6<br/>AI)]
end
WebUI --> YARP
MobileApp --> YARP
AITools --> YARP
YARP --> ProjectSvc
YARP --> WorkflowSvc
YARP --> UserSvc
YARP --> NotifSvc
YARP --> AuditSvc
YARP --> AISvc
ProjectSvc --> DB1
WorkflowSvc --> DB2
UserSvc --> DB3
NotifSvc --> DB4
AuditSvc --> DB5
AISvc --> DB6
ProjectSvc -.gRPC.-> WorkflowSvc
ProjectSvc -.gRPC.-> UserSvc
WorkflowSvc -.gRPC.-> ProjectSvc
ProjectSvc --> RabbitMQ
WorkflowSvc --> RabbitMQ
NotifSvc --> RabbitMQ
AuditSvc --> RabbitMQ
ProjectSvc --> Redis
UserSvc --> Redis
ProjectSvc --> Consul
WorkflowSvc --> Consul
UserSvc --> Consul
NotifSvc --> Consul
AuditSvc --> Consul
AISvc --> Consul
ProjectSvc --> Jaeger
WorkflowSvc --> Jaeger
UserSvc --> Jaeger
```
### 1.2 Service Catalog
| Service | Port | Responsibility | Database | Key APIs |
|---------|------|---------------|----------|----------|
| **Project Service** | 5001 | Project/Epic/Story/Task management | PostgreSQL 1 | `/api/projects/*`, gRPC |
| **Workflow Service** | 5002 | Workflow engine, state transitions | PostgreSQL 2 | `/api/workflows/*`, gRPC |
| **User Service** | 5003 | Authentication, authorization, users | PostgreSQL 3 | `/api/users/*`, `/api/auth/*` |
| **Notification Service** | 5004 | SignalR, email, push notifications | PostgreSQL 4 | `/api/notifications/*`, SignalR |
| **Audit Service** | 5005 | Event store, audit logs, rollback | PostgreSQL 5 | `/api/audit/*` |
| **AI Service** | 5006 | MCP Server, AI task generation | PostgreSQL 6 | `/api/ai/*`, MCP Resources |
| **API Gateway** | 8080 | Routing, auth, rate limiting | - | All external routes |
---
## 2. Service Design - Bounded Contexts
### 2.1 Project Service (Core Domain)
**Bounded Context:** Project Management
**Domain Model:**
```csharp
// Project Aggregate Root
public class Project : AggregateRoot
{
public ProjectId Id { get; private set; }
public string Name { get; private set; }
public ProjectKey Key { get; private set; }
private readonly List<Epic> _epics = new();
public IReadOnlyCollection<Epic> Epics => _epics.AsReadOnly();
// Business methods
public Epic CreateEpic(string name, string description);
public void UpdateDetails(string name, string description);
}
// Epic Entity
public class Epic : Entity
{
public EpicId Id { get; private set; }
public string Name { get; private set; }
private readonly List<Story> _stories = new();
public IReadOnlyCollection<Story> Stories => _stories.AsReadOnly();
public Story CreateStory(string title, string description);
}
```
**API Endpoints (REST):**
```
GET /api/projects # List projects
POST /api/projects # Create project
GET /api/projects/{id} # Get project
PUT /api/projects/{id} # Update project
DELETE /api/projects/{id} # Delete project
GET /api/projects/{id}/epics # List epics
POST /api/projects/{id}/epics # Create epic
GET /api/epics/{id} # Get epic
PUT /api/epics/{id} # Update epic
GET /api/epics/{id}/stories # List stories
POST /api/epics/{id}/stories # Create story
GET /api/stories/{id} # Get story
PUT /api/stories/{id} # Update story
GET /api/stories/{id}/tasks # List tasks
POST /api/stories/{id}/tasks # Create task
GET /api/tasks/{id} # Get task
PUT /api/tasks/{id} # Update task
PATCH /api/tasks/{id}/status # Update task status
```
**gRPC Services:**
```protobuf
// protos/project.proto
syntax = "proto3";
package colaflow.project;
service ProjectService {
rpc GetProject (GetProjectRequest) returns (ProjectResponse);
rpc GetProjectByKey (GetProjectByKeyRequest) returns (ProjectResponse);
rpc GetTasksByAssignee (GetTasksByAssigneeRequest) returns (TaskListResponse);
rpc ValidateProjectExists (ValidateProjectRequest) returns (ValidationResponse);
}
message GetProjectRequest {
string project_id = 1;
}
message ProjectResponse {
string id = 1;
string name = 2;
string key = 3;
string status = 4;
string owner_id = 5;
}
message GetTasksByAssigneeRequest {
string assignee_id = 1;
int32 page = 2;
int32 page_size = 3;
}
message TaskListResponse {
repeated TaskDto tasks = 1;
int32 total_count = 2;
}
message TaskDto {
string id = 1;
string title = 2;
string status = 3;
string priority = 4;
string project_id = 5;
}
```
**Published Events:**
```csharp
public record ProjectCreatedEvent(Guid ProjectId, string ProjectName, Guid OwnerId);
public record TaskStatusChangedEvent(Guid TaskId, string OldStatus, string NewStatus, Guid ChangedBy);
public record TaskAssignedEvent(Guid TaskId, Guid AssigneeId, Guid AssignedBy);
public record EpicCreatedEvent(Guid EpicId, string EpicName, Guid ProjectId);
```
**Database Schema:**
```sql
-- Projects Table
CREATE TABLE projects (
id UUID PRIMARY KEY,
name VARCHAR(200) NOT NULL,
key VARCHAR(10) NOT NULL UNIQUE,
description TEXT,
status VARCHAR(50) NOT NULL,
owner_id UUID NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP
);
-- Epics Table
CREATE TABLE epics (
id UUID PRIMARY KEY,
project_id UUID NOT NULL REFERENCES projects(id),
name VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Stories Table
CREATE TABLE stories (
id UUID PRIMARY KEY,
epic_id UUID NOT NULL REFERENCES epics(id),
title VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL,
priority VARCHAR(50) NOT NULL,
assignee_id UUID,
estimated_hours DECIMAL(10,2),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Tasks Table
CREATE TABLE tasks (
id UUID PRIMARY KEY,
story_id UUID NOT NULL REFERENCES stories(id),
title VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL,
priority VARCHAR(50) NOT NULL,
assignee_id UUID,
estimated_hours DECIMAL(10,2),
actual_hours DECIMAL(10,2),
custom_fields JSONB,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Indexes
CREATE INDEX idx_projects_key ON projects(key);
CREATE INDEX idx_epics_project_id ON epics(project_id);
CREATE INDEX idx_stories_epic_id ON stories(epic_id);
CREATE INDEX idx_stories_assignee_id ON stories(assignee_id);
CREATE INDEX idx_tasks_story_id ON tasks(story_id);
CREATE INDEX idx_tasks_assignee_id ON tasks(assignee_id);
CREATE INDEX idx_tasks_status ON tasks(status);
```
---
### 2.2 Workflow Service
**Bounded Context:** Workflow Engine
**Domain Model:**
```csharp
public class Workflow : AggregateRoot
{
public WorkflowId Id { get; private set; }
public string Name { get; private set; }
public Guid ProjectId { get; private set; }
public bool IsDefault { get; private set; }
private readonly List<WorkflowState> _states = new();
public IReadOnlyCollection<WorkflowState> States => _states.AsReadOnly();
public void AddState(string stateName);
public void AddTransition(string fromState, string toState);
public bool CanTransition(string currentState, string targetState);
}
public class WorkflowState : Entity
{
public string Name { get; private set; }
public StateCategory Category { get; private set; } // ToDo, InProgress, Done
private readonly List<StateTransition> _transitions = new();
public IReadOnlyCollection<StateTransition> Transitions => _transitions.AsReadOnly();
}
```
**API Endpoints:**
```
GET /api/workflows # List all workflows
POST /api/workflows # Create workflow
GET /api/workflows/{id} # Get workflow
PUT /api/workflows/{id} # Update workflow
DELETE /api/workflows/{id} # Delete workflow
GET /api/workflows/project/{projectId} # Get workflows for project
POST /api/workflows/{id}/validate # Validate transition
```
**gRPC Services:**
```protobuf
// protos/workflow.proto
syntax = "proto3";
package colaflow.workflow;
service WorkflowService {
rpc GetWorkflowByProject (GetWorkflowByProjectRequest) returns (WorkflowResponse);
rpc ValidateTransition (ValidateTransitionRequest) returns (ValidationResponse);
rpc GetAvailableTransitions (GetAvailableTransitionsRequest) returns (TransitionsResponse);
}
message GetWorkflowByProjectRequest {
string project_id = 1;
}
message WorkflowResponse {
string id = 1;
string name = 2;
repeated WorkflowState states = 3;
}
message WorkflowState {
string name = 1;
string category = 2;
repeated string allowed_transitions = 3;
}
message ValidateTransitionRequest {
string workflow_id = 1;
string current_state = 2;
string target_state = 3;
}
message ValidationResponse {
bool is_valid = 1;
string message = 2;
}
```
**Published Events:**
```csharp
public record WorkflowCreatedEvent(Guid WorkflowId, Guid ProjectId);
public record StateTransitionValidatedEvent(Guid WorkflowId, string FromState, string ToState);
```
---
### 2.3 User Service
**Bounded Context:** User Management & Authentication
**Domain Model:**
```csharp
public class User : AggregateRoot
{
public UserId Id { get; private set; }
public Email Email { get; private set; }
public string FirstName { get; private set; }
public string LastName { get; private set; }
public string PasswordHash { get; private set; }
public UserRole Role { get; private set; }
public static User Create(string email, string firstName, string lastName, string password);
public void UpdateProfile(string firstName, string lastName);
public void ChangePassword(string currentPassword, string newPassword);
}
public class Team : AggregateRoot
{
public TeamId Id { get; private set; }
public string Name { get; private set; }
private readonly List<TeamMember> _members = new();
public IReadOnlyCollection<TeamMember> Members => _members.AsReadOnly();
public void AddMember(UserId userId, TeamRole role);
public void RemoveMember(UserId userId);
}
```
**API Endpoints:**
```
POST /api/auth/login # Login
POST /api/auth/register # Register
POST /api/auth/refresh # Refresh token
POST /api/auth/logout # Logout
GET /api/users # List users
GET /api/users/{id} # Get user
PUT /api/users/{id} # Update user
GET /api/users/me # Get current user
GET /api/teams # List teams
POST /api/teams # Create team
GET /api/teams/{id} # Get team
PUT /api/teams/{id} # Update team
POST /api/teams/{id}/members # Add team member
DELETE /api/teams/{id}/members/{userId} # Remove team member
```
**gRPC Services:**
```protobuf
// protos/user.proto
syntax = "proto3";
package colaflow.user;
service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc GetUsersByIds (GetUsersByIdsRequest) returns (UsersResponse);
rpc ValidateToken (ValidateTokenRequest) returns (TokenValidationResponse);
rpc GetUserPermissions (GetUserPermissionsRequest) returns (PermissionsResponse);
}
message GetUserRequest {
string user_id = 1;
}
message UserResponse {
string id = 1;
string email = 2;
string first_name = 3;
string last_name = 4;
string role = 5;
}
message GetUsersByIdsRequest {
repeated string user_ids = 1;
}
message UsersResponse {
repeated UserResponse users = 1;
}
message ValidateTokenRequest {
string token = 1;
}
message TokenValidationResponse {
bool is_valid = 1;
string user_id = 2;
string role = 3;
}
```
**Published Events:**
```csharp
public record UserRegisteredEvent(Guid UserId, string Email, string FullName);
public record UserProfileUpdatedEvent(Guid UserId, string FirstName, string LastName);
public record TeamCreatedEvent(Guid TeamId, string TeamName);
public record TeamMemberAddedEvent(Guid TeamId, Guid UserId, string Role);
```
---
### 2.4 Notification Service
**Bounded Context:** Notifications & Real-time Communication
**Domain Model:**
```csharp
public class Notification : AggregateRoot
{
public NotificationId Id { get; private set; }
public Guid RecipientId { get; private set; }
public string Title { get; private set; }
public string Message { get; private set; }
public NotificationType Type { get; private set; }
public bool IsRead { get; private set; }
public DateTime CreatedAt { get; private set; }
public void MarkAsRead();
}
public class NotificationSubscription : Entity
{
public Guid UserId { get; private set; }
public NotificationChannel Channel { get; private set; } // Email, SignalR, Push
public string Endpoint { get; private set; }
public bool IsActive { get; private set; }
}
```
**API Endpoints:**
```
GET /api/notifications # List notifications
POST /api/notifications # Create notification (internal)
PATCH /api/notifications/{id}/read # Mark as read
DELETE /api/notifications/{id} # Delete notification
GET /api/subscriptions # List subscriptions
POST /api/subscriptions # Create subscription
DELETE /api/subscriptions/{id} # Delete subscription
```
**SignalR Hub:**
```csharp
public class NotificationHub : Hub
{
public async Task JoinProject(string projectId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"project_{projectId}");
}
public async Task LeaveProject(string projectId)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"project_{projectId}");
}
}
// Server-side push
await _hubContext.Clients.Group($"project_{projectId}").SendAsync("TaskUpdated", taskDto);
```
**Consumed Events:**
```csharp
// Listens to events from other services
public class TaskAssignedEventConsumer : IConsumer<TaskAssignedEvent>
{
public async Task Consume(ConsumeContext<TaskAssignedEvent> context)
{
var notification = Notification.Create(
context.Message.AssigneeId,
"Task Assigned",
$"You have been assigned to task: {context.Message.TaskId}"
);
await _notificationRepository.AddAsync(notification);
await _hubContext.Clients.User(context.Message.AssigneeId.ToString())
.SendAsync("NotificationReceived", notification);
}
}
```
---
### 2.5 Audit Service
**Bounded Context:** Audit Logging & Event Store
**Domain Model:**
```csharp
public class AuditLog : Entity
{
public long Id { get; private set; }
public string EntityType { get; private set; }
public Guid EntityId { get; private set; }
public string Action { get; private set; }
public string Changes { get; private set; } // JSON
public Guid UserId { get; private set; }
public DateTime Timestamp { get; private set; }
public string IpAddress { get; private set; }
}
public class DomainEventRecord : Entity
{
public long Id { get; private set; }
public string EventType { get; private set; }
public Guid AggregateId { get; private set; }
public string EventData { get; private set; } // JSON
public DateTime OccurredOn { get; private set; }
public DateTime? ProcessedOn { get; private set; }
}
```
**API Endpoints:**
```
GET /api/audit-logs # List audit logs
GET /api/audit-logs/{entityType}/{entityId} # Get entity audit logs
POST /api/audit-logs/{id}/rollback # Rollback changes
GET /api/events # List domain events
GET /api/events/{aggregateId} # Get aggregate events
```
**Consumed Events:**
```csharp
// Listens to ALL domain events from all services
public class UniversalEventConsumer : IConsumer<DomainEvent>
{
public async Task Consume(ConsumeContext<DomainEvent> context)
{
var eventRecord = new DomainEventRecord
{
EventType = context.Message.GetType().Name,
AggregateId = context.Message.AggregateId,
EventData = JsonSerializer.Serialize(context.Message),
OccurredOn = context.Message.OccurredOn
};
await _eventStoreRepository.AddAsync(eventRecord);
}
}
```
---
### 2.6 AI Service (MCP Server)
**Bounded Context:** AI Integration & MCP Protocol
**Domain Model:**
```csharp
public class AITask : AggregateRoot
{
public AITaskId Id { get; private set; }
public string Prompt { get; private set; }
public string Response { get; private set; }
public AITaskStatus Status { get; private set; }
public Guid CreatedBy { get; private set; }
public DateTime CreatedAt { get; private set; }
public void Complete(string response);
public void Fail(string errorMessage);
}
public class MCPResource : Entity
{
public string ResourceId { get; private set; }
public string Type { get; private set; } // projects.search, issues.search
public string Schema { get; private set; } // JSON Schema
}
```
**API Endpoints:**
```
POST /api/ai/tasks # Create AI task
GET /api/ai/tasks/{id} # Get AI task
GET /api/ai/tasks # List AI tasks
GET /api/mcp/resources # List MCP resources
GET /api/mcp/resources/{resourceId} # Get resource data
POST /api/mcp/tools/{toolName} # Execute MCP tool
GET /api/mcp/tools # List MCP tools
```
**MCP Resources:**
```json
{
"resources": [
{
"uri": "colaflow://projects.search",
"name": "Search Projects",
"description": "Search and list projects",
"mimeType": "application/json"
},
{
"uri": "colaflow://issues.search",
"name": "Search Issues",
"description": "Search tasks and issues",
"mimeType": "application/json"
}
]
}
```
**MCP Tools:**
```json
{
"tools": [
{
"name": "create_task",
"description": "Create a new task",
"inputSchema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"description": { "type": "string" },
"priority": { "type": "string", "enum": ["Low", "Medium", "High", "Urgent"] }
},
"required": ["title"]
}
},
{
"name": "update_task_status",
"description": "Update task status with diff preview",
"inputSchema": {
"type": "object",
"properties": {
"task_id": { "type": "string" },
"new_status": { "type": "string" }
},
"required": ["task_id", "new_status"]
}
}
]
}
```
---
## 3. Service Communication Patterns
### 3.1 Synchronous Communication (gRPC)
**When to use gRPC:**
- Real-time queries (e.g., "Get User by ID")
- Validation requests (e.g., "Check if project exists")
- Low-latency requirements
**Example: Project Service → User Service**
```csharp
// Project Service - gRPC Client
public class UserServiceClient
{
private readonly UserService.UserServiceClient _grpcClient;
public UserServiceClient(UserService.UserServiceClient grpcClient)
{
_grpcClient = grpcClient;
}
public async Task<UserDto> GetUserAsync(Guid userId)
{
var request = new GetUserRequest { UserId = userId.ToString() };
var response = await _grpcClient.GetUserAsync(request);
return new UserDto
{
Id = Guid.Parse(response.Id),
Email = response.Email,
FirstName = response.FirstName,
LastName = response.LastName
};
}
}
// Used in Command Handler
public class AssignTaskCommandHandler : IRequestHandler<AssignTaskCommand>
{
private readonly ITaskRepository _taskRepository;
private readonly UserServiceClient _userServiceClient;
public async Task<TaskDto> Handle(AssignTaskCommand request, CancellationToken ct)
{
// Validate user exists via gRPC
var user = await _userServiceClient.GetUserAsync(request.AssigneeId);
if (user == null)
throw new NotFoundException("User not found");
// Assign task
var task = await _taskRepository.GetByIdAsync(request.TaskId);
task.AssignTo(request.AssigneeId);
await _unitOfWork.CommitAsync(ct);
return _mapper.Map<TaskDto>(task);
}
}
```
**gRPC Client Registration:**
```csharp
// Program.cs
builder.Services.AddGrpcClient<UserService.UserServiceClient>(options =>
{
options.Address = new Uri("https://user-service:5003");
})
.ConfigurePrimaryHttpMessageHandler(() =>
{
return new HttpClientHandler
{
ServerCertificateCustomValidationCallback =
HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
};
});
```
### 3.2 Asynchronous Communication (RabbitMQ + MassTransit)
**When to use Async Messaging:**
- Event notifications (e.g., "Task Created")
- Cross-service workflows (e.g., Saga orchestration)
- Decoupled communication
**Example: Task Created Event Flow**
```csharp
// Project Service - Publisher
public class CreateTaskCommandHandler : IRequestHandler<CreateTaskCommand>
{
private readonly IPublishEndpoint _publishEndpoint;
public async Task<TaskDto> Handle(CreateTaskCommand request, CancellationToken ct)
{
// Create task
var task = Task.Create(request.Title, request.Description);
await _taskRepository.AddAsync(task, ct);
await _unitOfWork.CommitAsync(ct);
// Publish event
await _publishEndpoint.Publish(new TaskCreatedEvent
{
TaskId = task.Id,
Title = task.Title,
AssigneeId = task.AssigneeId,
ProjectId = task.ProjectId
}, ct);
return _mapper.Map<TaskDto>(task);
}
}
// Notification Service - Consumer
public class TaskCreatedEventConsumer : IConsumer<TaskCreatedEvent>
{
private readonly INotificationRepository _notificationRepository;
private readonly IHubContext<NotificationHub> _hubContext;
public async Task Consume(ConsumeContext<TaskCreatedEvent> context)
{
var evt = context.Message;
// Create notification
var notification = Notification.Create(
evt.AssigneeId,
"New Task Assigned",
$"You have been assigned to task: {evt.Title}"
);
await _notificationRepository.AddAsync(notification);
// Send SignalR notification
await _hubContext.Clients.User(evt.AssigneeId.ToString())
.SendAsync("TaskCreated", new { evt.TaskId, evt.Title });
}
}
// Audit Service - Consumer
public class TaskCreatedEventConsumer : IConsumer<TaskCreatedEvent>
{
private readonly IEventStoreRepository _eventStoreRepository;
public async Task Consume(ConsumeContext<TaskCreatedEvent> context)
{
// Store event in event store
var eventRecord = new DomainEventRecord
{
EventType = nameof(TaskCreatedEvent),
AggregateId = context.Message.TaskId,
EventData = JsonSerializer.Serialize(context.Message),
OccurredOn = DateTime.UtcNow
};
await _eventStoreRepository.AddAsync(eventRecord);
}
}
```
**MassTransit Configuration:**
```csharp
// Program.cs
builder.Services.AddMassTransit(config =>
{
// Register consumers
config.AddConsumer<TaskCreatedEventConsumer>();
config.AddConsumer<TaskAssignedEventConsumer>();
config.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://rabbitmq:5672", h =>
{
h.Username("guest");
h.Password("guest");
});
// Configure endpoints
cfg.ReceiveEndpoint("notification-service", e =>
{
e.ConfigureConsumer<TaskCreatedEventConsumer>(context);
});
});
});
```
---
## 4. Distributed Transactions - Saga Pattern
### 4.1 Saga Orchestration with MassTransit
**Use Case: Create Project with Default Workflow**
**Requirements:**
1. Project Service: Create project
2. Workflow Service: Create default workflow
3. Notification Service: Send notification
4. If any step fails → compensate (rollback)
**Saga State Machine:**
```csharp
// Saga State
public class CreateProjectSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
// Saga data
public Guid ProjectId { get; set; }
public string ProjectName { get; set; }
public Guid OwnerId { get; set; }
public Guid? WorkflowId { get; set; }
// Timestamps
public DateTime CreatedAt { get; set; }
public DateTime? CompletedAt { get; set; }
}
// Saga Definition
public class CreateProjectSaga : MassTransitStateMachine<CreateProjectSagaState>
{
public State CreatingProject { get; private set; }
public State CreatingWorkflow { get; private set; }
public State SendingNotification { get; private set; }
public State Completed { get; private set; }
public State Failed { get; private set; }
// Events
public Event<CreateProjectCommand> CreateProject { get; private set; }
public Event<ProjectCreated> ProjectCreated { get; private set; }
public Event<CreateWorkflowCommand> CreateWorkflow { get; private set; }
public Event<WorkflowCreated> WorkflowCreated { get; private set; }
public Event<ProjectCreationFailed> ProjectFailed { get; private set; }
public Event<WorkflowCreationFailed> WorkflowFailed { get; private set; }
public CreateProjectSaga()
{
InstanceState(x => x.CurrentState);
// Step 1: Create Project
Initially(
When(CreateProject)
.Then(context =>
{
context.Saga.ProjectName = context.Message.Name;
context.Saga.OwnerId = context.Message.OwnerId;
context.Saga.CreatedAt = DateTime.UtcNow;
})
.TransitionTo(CreatingProject)
.Publish(context => new CreateProjectInternalCommand
{
CorrelationId = context.Saga.CorrelationId,
Name = context.Message.Name,
Description = context.Message.Description,
Key = context.Message.Key,
OwnerId = context.Message.OwnerId
})
);
// Step 2: Project Created → Create Workflow
During(CreatingProject,
When(ProjectCreated)
.Then(context =>
{
context.Saga.ProjectId = context.Message.ProjectId;
})
.TransitionTo(CreatingWorkflow)
.PublishAsync(context => context.Init<CreateWorkflowCommand>(new
{
CorrelationId = context.Saga.CorrelationId,
ProjectId = context.Message.ProjectId,
Name = $"{context.Saga.ProjectName} Workflow"
}))
);
// Step 3: Workflow Created → Send Notification
During(CreatingWorkflow,
When(WorkflowCreated)
.Then(context =>
{
context.Saga.WorkflowId = context.Message.WorkflowId;
})
.TransitionTo(SendingNotification)
.PublishAsync(context => context.Init<SendNotificationCommand>(new
{
RecipientId = context.Saga.OwnerId,
Title = "Project Created",
Message = $"Project '{context.Saga.ProjectName}' has been created successfully."
}))
);
// Step 4: Notification Sent → Complete
During(SendingNotification,
When(NotificationSent)
.Then(context =>
{
context.Saga.CompletedAt = DateTime.UtcNow;
})
.TransitionTo(Completed)
.Finalize()
);
// Compensation: Project Creation Failed
During(CreatingProject,
When(ProjectFailed)
.Then(context =>
{
// Log failure
Console.WriteLine($"Project creation failed: {context.Message.Reason}");
})
.TransitionTo(Failed)
.Finalize()
);
// Compensation: Workflow Creation Failed → Delete Project
During(CreatingWorkflow,
When(WorkflowFailed)
.Then(context =>
{
Console.WriteLine($"Workflow creation failed: {context.Message.Reason}");
})
.PublishAsync(context => context.Init<DeleteProjectCommand>(new
{
ProjectId = context.Saga.ProjectId,
Reason = "Workflow creation failed"
}))
.TransitionTo(Failed)
.Finalize()
);
SetCompletedWhenFinalized();
}
}
// Saga Registration
builder.Services.AddMassTransit(config =>
{
config.AddSagaStateMachine<CreateProjectSaga, CreateProjectSagaState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
r.AddDbContext<DbContext, SagaDbContext>((provider, builder) =>
{
builder.UseNpgsql(connectionString);
});
});
config.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://rabbitmq:5672");
cfg.ConfigureEndpoints(context);
});
});
```
**Saga Database Table:**
```sql
CREATE TABLE create_project_saga_state (
correlation_id UUID PRIMARY KEY,
current_state VARCHAR(100) NOT NULL,
project_id UUID,
project_name VARCHAR(200),
owner_id UUID,
workflow_id UUID,
created_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP
);
```
### 4.2 Outbox Pattern (Reliable Messaging)
**Problem:** Ensure domain events are published even if RabbitMQ is down.
**Solution:** Store events in database, then publish asynchronously.
```csharp
// Outbox Message Entity
public class OutboxMessage
{
public Guid Id { get; set; }
public string Type { get; set; }
public string Content { get; set; } // JSON
public DateTime OccurredOn { get; set; }
public DateTime? ProcessedOn { get; set; }
public string Error { get; set; }
}
// Save to Outbox in same transaction
public async Task<int> CommitAsync(CancellationToken cancellationToken = default)
{
var domainEvents = ChangeTracker
.Entries<AggregateRoot>()
.SelectMany(x => x.Entity.DomainEvents)
.ToList();
// Store events in outbox
foreach (var domainEvent in domainEvents)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = domainEvent.GetType().Name,
Content = JsonSerializer.Serialize(domainEvent),
OccurredOn = DateTime.UtcNow
};
OutboxMessages.Add(outboxMessage);
}
// Save changes (domain entities + outbox messages in same transaction)
var result = await base.SaveChangesAsync(cancellationToken);
return result;
}
// Background Service: Process Outbox
public class OutboxProcessor : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
// Get unprocessed messages
var messages = await dbContext.OutboxMessages
.Where(m => m.ProcessedOn == null)
.OrderBy(m => m.OccurredOn)
.Take(100)
.ToListAsync(stoppingToken);
foreach (var message in messages)
{
try
{
// Deserialize and publish
var eventType = Type.GetType(message.Type);
var domainEvent = JsonSerializer.Deserialize(message.Content, eventType);
await publishEndpoint.Publish(domainEvent, stoppingToken);
// Mark as processed
message.ProcessedOn = DateTime.UtcNow;
}
catch (Exception ex)
{
message.Error = ex.Message;
}
}
await dbContext.SaveChangesAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}
```
---
## 5. API Gateway (YARP)
### 5.1 YARP Configuration
**Why YARP:**
- Native .NET 9 support
- High performance reverse proxy
- Dynamic configuration
- Built-in load balancing
- Request/response transformation
**appsettings.json:**
```json
{
"ReverseProxy": {
"Routes": {
"project-route": {
"ClusterId": "project-cluster",
"AuthorizationPolicy": "authenticated",
"Match": {
"Path": "/api/projects/{**catch-all}"
},
"Transforms": [
{
"RequestHeader": "X-Forwarded-For",
"Append": "{RemoteIpAddress}"
}
]
},
"workflow-route": {
"ClusterId": "workflow-cluster",
"AuthorizationPolicy": "authenticated",
"Match": {
"Path": "/api/workflows/{**catch-all}"
}
},
"user-route": {
"ClusterId": "user-cluster",
"Match": {
"Path": "/api/users/{**catch-all}"
}
},
"auth-route": {
"ClusterId": "user-cluster",
"Match": {
"Path": "/api/auth/{**catch-all}"
}
},
"notification-route": {
"ClusterId": "notification-cluster",
"AuthorizationPolicy": "authenticated",
"Match": {
"Path": "/api/notifications/{**catch-all}"
}
},
"audit-route": {
"ClusterId": "audit-cluster",
"AuthorizationPolicy": "admin",
"Match": {
"Path": "/api/audit-logs/{**catch-all}"
}
},
"ai-route": {
"ClusterId": "ai-cluster",
"AuthorizationPolicy": "authenticated",
"Match": {
"Path": "/api/ai/{**catch-all}"
}
},
"mcp-route": {
"ClusterId": "ai-cluster",
"AuthorizationPolicy": "mcp-client",
"Match": {
"Path": "/api/mcp/{**catch-all}"
}
}
},
"Clusters": {
"project-cluster": {
"LoadBalancingPolicy": "RoundRobin",
"Destinations": {
"project-service-1": {
"Address": "http://project-service:5001"
}
}
},
"workflow-cluster": {
"LoadBalancingPolicy": "RoundRobin",
"Destinations": {
"workflow-service-1": {
"Address": "http://workflow-service:5002"
}
}
},
"user-cluster": {
"LoadBalancingPolicy": "RoundRobin",
"Destinations": {
"user-service-1": {
"Address": "http://user-service:5003"
}
}
},
"notification-cluster": {
"LoadBalancingPolicy": "RoundRobin",
"Destinations": {
"notification-service-1": {
"Address": "http://notification-service:5004"
}
}
},
"audit-cluster": {
"LoadBalancingPolicy": "RoundRobin",
"Destinations": {
"audit-service-1": {
"Address": "http://audit-service:5005"
}
}
},
"ai-cluster": {
"LoadBalancingPolicy": "RoundRobin",
"Destinations": {
"ai-service-1": {
"Address": "http://ai-service:5006"
}
}
}
}
}
}
```
### 5.2 API Gateway Code
```csharp
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Add YARP
builder.Services.AddReverseProxy()
.LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));
// Add Authentication
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.Authority = "http://user-service:5003";
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidateLifetime = true,
ValidIssuer = "ColaFlow",
ValidAudience = "ColaFlow"
};
});
// Add Authorization Policies
builder.Services.AddAuthorization(options =>
{
options.AddPolicy("authenticated", policy => policy.RequireAuthenticatedUser());
options.AddPolicy("admin", policy => policy.RequireRole("Admin"));
options.AddPolicy("mcp-client", policy => policy.RequireClaim("client_type", "mcp"));
});
// Add Rate Limiting
builder.Services.AddRateLimiter(options =>
{
options.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(context =>
{
return RateLimitPartition.GetFixedWindowLimiter(
partitionKey: context.User.Identity?.Name ?? context.Request.Headers.Host.ToString(),
factory: partition => new FixedWindowRateLimiterOptions
{
AutoReplenishment = true,
PermitLimit = 100,
QueueLimit = 0,
Window = TimeSpan.FromMinutes(1)
});
});
});
// Add Distributed Tracing
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddJaegerExporter(options =>
{
options.AgentHost = "jaeger";
options.AgentPort = 6831;
}));
var app = builder.Build();
app.UseRateLimiter();
app.UseAuthentication();
app.UseAuthorization();
// Map YARP
app.MapReverseProxy();
app.Run();
```
---
## 6. Service Discovery (Consul)
### 6.1 Consul Configuration
**Why Consul:**
- Service registry and health checking
- Dynamic service discovery
- Key/value store for configuration
- Production-ready and battle-tested
**Service Registration:**
```csharp
// Program.cs - Each Service
builder.Services.AddConsul(builder.Configuration);
public static class ConsulExtensions
{
public static IServiceCollection AddConsul(this IServiceCollection services, IConfiguration configuration)
{
var consulConfig = configuration.GetSection("Consul").Get<ConsulConfig>();
services.AddSingleton<IConsulClient>(sp => new ConsulClient(config =>
{
config.Address = new Uri(consulConfig.Address);
}));
services.AddHostedService<ConsulHostedService>();
return services;
}
}
public class ConsulHostedService : IHostedService
{
private readonly IConsulClient _consulClient;
private readonly IConfiguration _configuration;
private string _registrationId;
public async Task StartAsync(CancellationToken cancellationToken)
{
var serviceConfig = _configuration.GetSection("Service").Get<ServiceConfig>();
_registrationId = $"{serviceConfig.Name}-{serviceConfig.Id}";
var registration = new AgentServiceRegistration
{
ID = _registrationId,
Name = serviceConfig.Name,
Address = serviceConfig.Address,
Port = serviceConfig.Port,
Tags = new[] { "colaflow", serviceConfig.Name },
Check = new AgentServiceCheck
{
HTTP = $"http://{serviceConfig.Address}:{serviceConfig.Port}/health",
Interval = TimeSpan.FromSeconds(10),
Timeout = TimeSpan.FromSeconds(5),
DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1)
}
};
await _consulClient.Agent.ServiceRegister(registration, cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _consulClient.Agent.ServiceDeregister(_registrationId, cancellationToken);
}
}
```
**appsettings.json:**
```json
{
"Consul": {
"Address": "http://consul:8500"
},
"Service": {
"Name": "project-service",
"Id": "project-service-1",
"Address": "project-service",
"Port": 5001
}
}
```
**Service Discovery Client:**
```csharp
public class ServiceDiscoveryClient
{
private readonly IConsulClient _consulClient;
public async Task<string> GetServiceAddressAsync(string serviceName)
{
var services = await _consulClient.Health.Service(serviceName, "", true);
if (!services.Response.Any())
throw new Exception($"Service '{serviceName}' not found");
var service = services.Response.First();
return $"http://{service.Service.Address}:{service.Service.Port}";
}
}
// Usage
var userServiceAddress = await _serviceDiscovery.GetServiceAddressAsync("user-service");
var grpcClient = new UserService.UserServiceClient(
GrpcChannel.ForAddress(userServiceAddress)
);
```
---
## 7. Distributed Tracing (OpenTelemetry + Jaeger)
### 7.1 OpenTelemetry Configuration
```csharp
// Program.cs - Each Service
builder.Services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing
.AddAspNetCoreInstrumentation(options =>
{
options.RecordException = true;
options.Filter = (httpContext) =>
!httpContext.Request.Path.Value.Contains("health");
})
.AddHttpClientInstrumentation()
.AddGrpcClientInstrumentation()
.AddEntityFrameworkCoreInstrumentation(options =>
{
options.SetDbStatementForText = true;
})
.AddSource("MassTransit")
.SetResourceBuilder(ResourceBuilder.CreateDefault()
.AddService("project-service")
.AddAttributes(new Dictionary<string, object>
{
["environment"] = "production",
["version"] = "1.0.0"
}))
.AddJaegerExporter(options =>
{
options.AgentHost = "jaeger";
options.AgentPort = 6831;
});
})
.WithMetrics(metrics =>
{
metrics
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddRuntimeInstrumentation()
.AddPrometheusExporter();
});
```
### 7.2 Custom Instrumentation
```csharp
public class CreateTaskCommandHandler : IRequestHandler<CreateTaskCommand>
{
private readonly ActivitySource _activitySource = new("ColaFlow.ProjectService");
public async Task<TaskDto> Handle(CreateTaskCommand request, CancellationToken ct)
{
using var activity = _activitySource.StartActivity("CreateTask", ActivityKind.Server);
activity?.SetTag("task.title", request.Title);
activity?.SetTag("task.priority", request.Priority);
try
{
// Business logic
var task = Task.Create(request.Title, request.Description);
await _taskRepository.AddAsync(task, ct);
activity?.SetTag("task.id", task.Id);
activity?.SetStatus(ActivityStatusCode.Ok);
return _mapper.Map<TaskDto>(task);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);
throw;
}
}
}
```
---
## 8. Project Structure
### 8.1 Overall Structure
```
product-master/
├── services/
│ ├── project-service/
│ │ ├── src/
│ │ │ ├── ColaFlow.ProjectService.Domain/
│ │ │ ├── ColaFlow.ProjectService.Application/
│ │ │ ├── ColaFlow.ProjectService.Infrastructure/
│ │ │ └── ColaFlow.ProjectService.API/
│ │ ├── tests/
│ │ ├── Dockerfile
│ │ └── ColaFlow.ProjectService.sln
│ ├── workflow-service/
│ │ ├── src/
│ │ ├── tests/
│ │ ├── Dockerfile
│ │ └── ColaFlow.WorkflowService.sln
│ ├── user-service/
│ ├── notification-service/
│ ├── audit-service/
│ └── ai-service/
├── gateway/
│ └── api-gateway/
│ ├── src/
│ │ └── ColaFlow.ApiGateway/
│ │ ├── Program.cs
│ │ ├── appsettings.json
│ │ └── Middleware/
│ ├── Dockerfile
│ └── ColaFlow.ApiGateway.sln
├── shared/
│ ├── ColaFlow.Shared.Contracts/
│ │ ├── DTOs/
│ │ ├── Events/
│ │ └── Protos/
│ │ ├── project.proto
│ │ ├── workflow.proto
│ │ └── user.proto
│ ├── ColaFlow.Shared.Messaging/
│ │ ├── MassTransit/
│ │ └── EventBus/
│ └── ColaFlow.Shared.Common/
│ ├── Extensions/
│ ├── Utilities/
│ └── Constants/
├── infrastructure/
│ ├── docker/
│ │ ├── docker-compose.microservices.yml
│ │ ├── docker-compose.infrastructure.yml
│ │ └── .env
│ ├── k8s/
│ │ ├── namespaces/
│ │ │ └── colaflow-namespace.yaml
│ │ ├── services/
│ │ │ ├── project-service.yaml
│ │ │ ├── workflow-service.yaml
│ │ │ ├── user-service.yaml
│ │ │ ├── notification-service.yaml
│ │ │ ├── audit-service.yaml
│ │ │ ├── ai-service.yaml
│ │ │ └── api-gateway.yaml
│ │ ├── deployments/
│ │ │ ├── project-service-deployment.yaml
│ │ │ ├── workflow-service-deployment.yaml
│ │ │ └── ... (one per service)
│ │ ├── configmaps/
│ │ │ └── appsettings-configmap.yaml
│ │ ├── secrets/
│ │ │ └── database-secrets.yaml
│ │ ├── ingress/
│ │ │ └── ingress.yaml
│ │ └── infrastructure/
│ │ ├── postgres-statefulset.yaml
│ │ ├── rabbitmq-deployment.yaml
│ │ ├── redis-deployment.yaml
│ │ ├── consul-deployment.yaml
│ │ └── jaeger-deployment.yaml
│ └── helm/
│ └── colaflow/
│ ├── Chart.yaml
│ ├── values.yaml
│ ├── values-dev.yaml
│ ├── values-prod.yaml
│ └── templates/
│ ├── services/
│ ├── deployments/
│ ├── configmaps/
│ ├── secrets/
│ └── ingress/
├── colaflow-web/
├── scripts/
│ ├── build-all.sh
│ ├── deploy-k8s.sh
│ └── generate-protos.sh
└── docs/
├── Microservices-Architecture.md
├── Service-Development-Guide.md
└── Operational-Runbook.md
```
### 8.2 Service Structure (Example: Project Service)
```
project-service/
├── src/
│ ├── ColaFlow.ProjectService.Domain/
│ │ ├── Aggregates/
│ │ │ └── ProjectAggregate/
│ │ │ ├── Project.cs
│ │ │ ├── Epic.cs
│ │ │ ├── Story.cs
│ │ │ └── Task.cs
│ │ ├── ValueObjects/
│ │ ├── Events/
│ │ ├── Interfaces/
│ │ └── Exceptions/
│ ├── ColaFlow.ProjectService.Application/
│ │ ├── Commands/
│ │ │ ├── CreateProject/
│ │ │ ├── UpdateProject/
│ │ │ ├── CreateTask/
│ │ │ └── UpdateTaskStatus/
│ │ ├── Queries/
│ │ │ ├── GetProject/
│ │ │ ├── GetKanbanBoard/
│ │ │ └── SearchTasks/
│ │ ├── DTOs/
│ │ ├── Mappings/
│ │ └── Services/
│ │ └── Clients/
│ │ ├── UserServiceClient.cs
│ │ └── WorkflowServiceClient.cs
│ ├── ColaFlow.ProjectService.Infrastructure/
│ │ ├── Persistence/
│ │ │ ├── ProjectDbContext.cs
│ │ │ ├── Repositories/
│ │ │ └── Configurations/
│ │ ├── Messaging/
│ │ │ ├── EventPublisher.cs
│ │ │ └── Consumers/
│ │ ├── gRPC/
│ │ │ └── ProjectGrpcService.cs
│ │ └── Caching/
│ └── ColaFlow.ProjectService.API/
│ ├── Controllers/
│ │ ├── ProjectsController.cs
│ │ ├── EpicsController.cs
│ │ ├── StoriesController.cs
│ │ └── TasksController.cs
│ ├── Middleware/
│ ├── Program.cs
│ ├── appsettings.json
│ └── Dockerfile
├── tests/
│ ├── Domain.Tests/
│ ├── Application.Tests/
│ └── Integration.Tests/
└── ColaFlow.ProjectService.sln
```
---
## 9. Code Examples
### 9.1 gRPC Service Implementation
**protos/project.proto:**
```protobuf
syntax = "proto3";
package colaflow.project;
service ProjectService {
rpc GetProject (GetProjectRequest) returns (ProjectResponse);
rpc GetTasksByAssignee (GetTasksByAssigneeRequest) returns (TaskListResponse);
rpc ValidateProjectExists (ValidateProjectRequest) returns (ValidationResponse);
}
message GetProjectRequest {
string project_id = 1;
}
message ProjectResponse {
string id = 1;
string name = 2;
string key = 3;
string status = 4;
}
message GetTasksByAssigneeRequest {
string assignee_id = 1;
int32 page = 2;
int32 page_size = 3;
}
message TaskListResponse {
repeated TaskDto tasks = 1;
int32 total_count = 2;
}
message TaskDto {
string id = 1;
string title = 2;
string status = 3;
string priority = 4;
}
message ValidateProjectRequest {
string project_id = 1;
}
message ValidationResponse {
bool exists = 1;
string message = 2;
}
```
**Server Implementation:**
```csharp
// ColaFlow.ProjectService.Infrastructure/gRPC/ProjectGrpcService.cs
public class ProjectGrpcService : ProjectService.ProjectServiceBase
{
private readonly IMediator _mediator;
private readonly IMapper _mapper;
public ProjectGrpcService(IMediator mediator, IMapper mapper)
{
_mediator = mediator;
_mapper = mapper;
}
public override async Task<ProjectResponse> GetProject(
GetProjectRequest request,
ServerCallContext context)
{
var query = new GetProjectByIdQuery(Guid.Parse(request.ProjectId));
var project = await _mediator.Send(query);
return new ProjectResponse
{
Id = project.Id.ToString(),
Name = project.Name,
Key = project.Key,
Status = project.Status.ToString()
};
}
public override async Task<TaskListResponse> GetTasksByAssignee(
GetTasksByAssigneeRequest request,
ServerCallContext context)
{
var query = new GetTasksByAssigneeQuery(
Guid.Parse(request.AssigneeId),
request.Page,
request.PageSize
);
var result = await _mediator.Send(query);
var response = new TaskListResponse
{
TotalCount = result.TotalCount
};
response.Tasks.AddRange(result.Items.Select(task => new TaskDto
{
Id = task.Id.ToString(),
Title = task.Title,
Status = task.Status.ToString(),
Priority = task.Priority.ToString()
}));
return response;
}
public override async Task<ValidationResponse> ValidateProjectExists(
ValidateProjectRequest request,
ServerCallContext context)
{
try
{
var query = new GetProjectByIdQuery(Guid.Parse(request.ProjectId));
var project = await _mediator.Send(query);
return new ValidationResponse
{
Exists = true,
Message = "Project exists"
};
}
catch (NotFoundException)
{
return new ValidationResponse
{
Exists = false,
Message = "Project not found"
};
}
}
}
// Program.cs
builder.Services.AddGrpc();
app.MapGrpcService<ProjectGrpcService>();
```
**Client Usage:**
```csharp
// Workflow Service - gRPC Client
public class ProjectServiceClient
{
private readonly ProjectService.ProjectServiceClient _grpcClient;
public ProjectServiceClient(ProjectService.ProjectServiceClient grpcClient)
{
_grpcClient = grpcClient;
}
public async Task<bool> ValidateProjectExistsAsync(Guid projectId)
{
var request = new ValidateProjectRequest
{
ProjectId = projectId.ToString()
};
var response = await _grpcClient.ValidateProjectExistsAsync(request);
return response.Exists;
}
}
// Program.cs - Workflow Service
builder.Services.AddGrpcClient<ProjectService.ProjectServiceClient>(options =>
{
var projectServiceAddress = await serviceDiscovery.GetServiceAddressAsync("project-service");
options.Address = new Uri(projectServiceAddress);
})
.ConfigurePrimaryHttpMessageHandler(() =>
{
return new SocketsHttpHandler
{
PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
EnableMultipleHttp2Connections = true
};
});
```
### 9.2 Saga Pattern Example (Complete)
See **Section 4.1** for complete Saga implementation.
### 9.3 API Gateway Middleware
```csharp
// Correlation ID Middleware
public class CorrelationIdMiddleware
{
private readonly RequestDelegate _next;
public async Task InvokeAsync(HttpContext context)
{
var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault()
?? Guid.NewGuid().ToString();
context.Request.Headers["X-Correlation-ID"] = correlationId;
context.Response.Headers["X-Correlation-ID"] = correlationId;
// Add to activity for distributed tracing
Activity.Current?.SetTag("correlation_id", correlationId);
await _next(context);
}
}
// Usage
app.UseMiddleware<CorrelationIdMiddleware>();
```
---
## 10. Docker Compose (Local Development)
I'll create the Docker Compose configuration file next.
---
**Status:** Document creation in progress. Will continue with Docker Compose, Kubernetes, Helm Charts, and Migration Plan next.