Implemented comprehensive real-time notification system using SignalR to notify AI agents and users about PendingChange status updates. Key Features Implemented: - McpNotificationHub with Subscribe/Unsubscribe methods - Real-time notifications for all PendingChange lifecycle events - Tenant-based isolation for multi-tenancy security - Notification DTOs for structured message formats - Domain event handlers for automatic notification sending - Comprehensive unit tests for notification service and handlers - Client integration guide with examples for TypeScript, React, and Python Components Created: 1. SignalR Hub: - McpNotificationHub.cs - Central hub for MCP notifications 2. Notification DTOs: - PendingChangeNotification.cs (base class) - PendingChangeCreatedNotification.cs - PendingChangeApprovedNotification.cs - PendingChangeRejectedNotification.cs - PendingChangeAppliedNotification.cs - PendingChangeExpiredNotification.cs 3. Notification Service: - IMcpNotificationService.cs (interface) - McpNotificationService.cs (implementation using SignalR) 4. Event Handlers (send notifications): - PendingChangeCreatedNotificationHandler.cs - PendingChangeApprovedNotificationHandler.cs - PendingChangeRejectedNotificationHandler.cs - PendingChangeAppliedNotificationHandler.cs - PendingChangeExpiredNotificationHandler.cs 5. Tests: - McpNotificationServiceTests.cs - Unit tests for notification service - PendingChangeCreatedNotificationHandlerTests.cs - PendingChangeApprovedNotificationHandlerTests.cs 6. Documentation: - signalr-mcp-client-guide.md - Comprehensive client integration guide Technical Details: - Hub endpoint: /hubs/mcp-notifications - Authentication: JWT token via query string (?access_token=xxx) - Tenant isolation: Automatic group joining based on tenant ID - Group subscriptions: Per-pending-change and per-tenant groups - Notification delivery: < 1 second (real-time) - Fallback strategy: Polling if WebSocket unavailable Architecture Benefits: - Decoupled design using domain events - Notification failures don't break main flow - Scalable (supports Redis backplane for multi-instance) - Type-safe notification payloads - Tenant isolation built-in Story: Phase 3 - Tools & Diff Preview Priority: P0 CRITICAL Story Points: 3 Completion: 100% 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
476 lines
14 KiB
Markdown
476 lines
14 KiB
Markdown
# SignalR MCP Notifications Client Integration Guide
|
|
|
|
## Overview
|
|
|
|
This guide explains how to integrate with ColaFlow's MCP SignalR Hub to receive real-time notifications about PendingChange status updates.
|
|
|
|
## Hub Endpoint
|
|
|
|
```
|
|
wss://your-colaflow-instance.com/hubs/mcp-notifications
|
|
```
|
|
|
|
## Authentication
|
|
|
|
The hub requires JWT authentication. Pass the JWT token in the query string:
|
|
|
|
```
|
|
wss://your-colaflow-instance.com/hubs/mcp-notifications?access_token=YOUR_JWT_TOKEN
|
|
```
|
|
|
|
## Notification Types
|
|
|
|
The hub sends the following notification types:
|
|
|
|
1. **PendingChangeCreated** - A new PendingChange was created
|
|
2. **PendingChangeApproved** - A PendingChange was approved
|
|
3. **PendingChangeRejected** - A PendingChange was rejected
|
|
4. **PendingChangeApplied** - A PendingChange was successfully applied
|
|
5. **PendingChangeExpired** - A PendingChange expired (timeout)
|
|
|
|
## Client Implementation Examples
|
|
|
|
### JavaScript/TypeScript (SignalR Client)
|
|
|
|
```typescript
|
|
import * as signalR from "@microsoft/signalr";
|
|
|
|
// Create connection
|
|
const connection = new signalR.HubConnectionBuilder()
|
|
.withUrl("https://your-colaflow-instance.com/hubs/mcp-notifications", {
|
|
accessTokenFactory: () => localStorage.getItem("jwt_token") || "",
|
|
})
|
|
.withAutomaticReconnect() // Auto-reconnect on disconnect
|
|
.configureLogging(signalR.LogLevel.Information)
|
|
.build();
|
|
|
|
// Subscribe to PendingChangeCreated
|
|
connection.on("PendingChangeCreated", (notification) => {
|
|
console.log("New PendingChange created:", notification);
|
|
console.log("Summary:", notification.summary);
|
|
console.log("Entity Type:", notification.entityType);
|
|
console.log("Operation:", notification.operation);
|
|
});
|
|
|
|
// Subscribe to PendingChangeApproved
|
|
connection.on("PendingChangeApproved", (notification) => {
|
|
console.log("PendingChange approved:", notification);
|
|
console.log("Approved by:", notification.approvedBy);
|
|
console.log("Entity ID:", notification.entityId);
|
|
});
|
|
|
|
// Subscribe to PendingChangeRejected
|
|
connection.on("PendingChangeRejected", (notification) => {
|
|
console.log("PendingChange rejected:", notification);
|
|
console.log("Reason:", notification.reason);
|
|
console.log("Rejected by:", notification.rejectedBy);
|
|
});
|
|
|
|
// Subscribe to PendingChangeApplied
|
|
connection.on("PendingChangeApplied", (notification) => {
|
|
console.log("PendingChange applied:", notification);
|
|
console.log("Result:", notification.result);
|
|
});
|
|
|
|
// Subscribe to PendingChangeExpired
|
|
connection.on("PendingChangeExpired", (notification) => {
|
|
console.log("PendingChange expired:", notification);
|
|
});
|
|
|
|
// Start connection
|
|
async function start() {
|
|
try {
|
|
await connection.start();
|
|
console.log("SignalR Connected");
|
|
} catch (err) {
|
|
console.error("SignalR Connection Error:", err);
|
|
setTimeout(start, 5000); // Retry after 5 seconds
|
|
}
|
|
}
|
|
|
|
// Handle disconnection
|
|
connection.onclose(async () => {
|
|
console.log("SignalR Disconnected");
|
|
await start(); // Reconnect
|
|
});
|
|
|
|
// Start the connection
|
|
start();
|
|
```
|
|
|
|
### React Hook
|
|
|
|
```typescript
|
|
import { useEffect, useState } from "react";
|
|
import * as signalR from "@microsoft/signalr";
|
|
|
|
export function useMcpNotifications(jwtToken: string) {
|
|
const [connection, setConnection] = useState<signalR.HubConnection | null>(null);
|
|
const [isConnected, setIsConnected] = useState(false);
|
|
|
|
useEffect(() => {
|
|
const newConnection = new signalR.HubConnectionBuilder()
|
|
.withUrl("https://your-colaflow-instance.com/hubs/mcp-notifications", {
|
|
accessTokenFactory: () => jwtToken,
|
|
})
|
|
.withAutomaticReconnect()
|
|
.build();
|
|
|
|
setConnection(newConnection);
|
|
|
|
return () => {
|
|
newConnection.stop();
|
|
};
|
|
}, [jwtToken]);
|
|
|
|
useEffect(() => {
|
|
if (connection) {
|
|
connection
|
|
.start()
|
|
.then(() => {
|
|
console.log("SignalR Connected");
|
|
setIsConnected(true);
|
|
})
|
|
.catch((err) => {
|
|
console.error("SignalR Connection Error:", err);
|
|
setIsConnected(false);
|
|
});
|
|
|
|
connection.onclose(() => {
|
|
console.log("SignalR Disconnected");
|
|
setIsConnected(false);
|
|
});
|
|
}
|
|
}, [connection]);
|
|
|
|
return { connection, isConnected };
|
|
}
|
|
|
|
// Usage in a component
|
|
function MyComponent() {
|
|
const jwtToken = localStorage.getItem("jwt_token") || "";
|
|
const { connection, isConnected } = useMcpNotifications(jwtToken);
|
|
|
|
useEffect(() => {
|
|
if (!connection) return;
|
|
|
|
// Subscribe to notifications
|
|
connection.on("PendingChangeCreated", (notification) => {
|
|
console.log("New PendingChange:", notification);
|
|
// Update UI, show toast, etc.
|
|
});
|
|
|
|
connection.on("PendingChangeApproved", (notification) => {
|
|
console.log("PendingChange approved:", notification);
|
|
// Update UI, show success message, etc.
|
|
});
|
|
|
|
connection.on("PendingChangeRejected", (notification) => {
|
|
console.log("PendingChange rejected:", notification);
|
|
// Update UI, show error message, etc.
|
|
});
|
|
|
|
// Cleanup
|
|
return () => {
|
|
connection.off("PendingChangeCreated");
|
|
connection.off("PendingChangeApproved");
|
|
connection.off("PendingChangeRejected");
|
|
};
|
|
}, [connection]);
|
|
|
|
return (
|
|
<div>
|
|
<p>Connection Status: {isConnected ? "Connected" : "Disconnected"}</p>
|
|
</div>
|
|
);
|
|
}
|
|
```
|
|
|
|
### Python (AI Agent)
|
|
|
|
```python
|
|
import asyncio
|
|
import json
|
|
from signalrcore.hub_connection_builder import HubConnectionBuilder
|
|
|
|
class McpNotificationClient:
|
|
def __init__(self, hub_url: str, jwt_token: str):
|
|
self.hub_url = hub_url
|
|
self.jwt_token = jwt_token
|
|
self.connection = None
|
|
|
|
def on_pending_change_created(self, notification):
|
|
print(f"PendingChange created: {notification}")
|
|
print(f"Summary: {notification['summary']}")
|
|
|
|
def on_pending_change_approved(self, notification):
|
|
print(f"PendingChange approved: {notification}")
|
|
print(f"Entity ID: {notification['entityId']}")
|
|
# Continue AI workflow...
|
|
|
|
def on_pending_change_rejected(self, notification):
|
|
print(f"PendingChange rejected: {notification}")
|
|
print(f"Reason: {notification['reason']}")
|
|
# Adjust AI behavior...
|
|
|
|
def on_pending_change_applied(self, notification):
|
|
print(f"PendingChange applied: {notification}")
|
|
print(f"Result: {notification['result']}")
|
|
|
|
def on_pending_change_expired(self, notification):
|
|
print(f"PendingChange expired: {notification}")
|
|
|
|
async def start(self):
|
|
self.connection = HubConnectionBuilder()\
|
|
.with_url(f"{self.hub_url}?access_token={self.jwt_token}")\
|
|
.with_automatic_reconnect({
|
|
"type": "interval",
|
|
"intervals": [0, 2, 10, 30]
|
|
})\
|
|
.build()
|
|
|
|
# Register event handlers
|
|
self.connection.on("PendingChangeCreated", self.on_pending_change_created)
|
|
self.connection.on("PendingChangeApproved", self.on_pending_change_approved)
|
|
self.connection.on("PendingChangeRejected", self.on_pending_change_rejected)
|
|
self.connection.on("PendingChangeApplied", self.on_pending_change_applied)
|
|
self.connection.on("PendingChangeExpired", self.on_pending_change_expired)
|
|
|
|
# Start connection
|
|
self.connection.start()
|
|
print("SignalR Connected")
|
|
|
|
async def subscribe_to_pending_change(self, pending_change_id: str):
|
|
"""Subscribe to notifications for a specific pending change"""
|
|
await self.connection.invoke("SubscribeToPendingChange", pending_change_id)
|
|
print(f"Subscribed to PendingChange {pending_change_id}")
|
|
|
|
async def unsubscribe_from_pending_change(self, pending_change_id: str):
|
|
"""Unsubscribe from notifications for a specific pending change"""
|
|
await self.connection.invoke("UnsubscribeFromPendingChange", pending_change_id)
|
|
print(f"Unsubscribed from PendingChange {pending_change_id}")
|
|
|
|
def stop(self):
|
|
if self.connection:
|
|
self.connection.stop()
|
|
|
|
# Usage
|
|
async def main():
|
|
client = McpNotificationClient(
|
|
hub_url="https://your-colaflow-instance.com/hubs/mcp-notifications",
|
|
jwt_token="YOUR_JWT_TOKEN"
|
|
)
|
|
|
|
await client.start()
|
|
|
|
# Subscribe to a specific pending change
|
|
await client.subscribe_to_pending_change("123e4567-e89b-12d3-a456-426614174000")
|
|
|
|
# Keep running
|
|
await asyncio.sleep(3600) # Run for 1 hour
|
|
|
|
client.stop()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
```
|
|
|
|
## Hub Methods
|
|
|
|
### SubscribeToPendingChange
|
|
|
|
Subscribe to receive notifications for a specific pending change.
|
|
|
|
```typescript
|
|
await connection.invoke("SubscribeToPendingChange", pendingChangeId);
|
|
```
|
|
|
|
**Parameters:**
|
|
- `pendingChangeId` (Guid/string): The ID of the pending change to subscribe to
|
|
|
|
### UnsubscribeFromPendingChange
|
|
|
|
Unsubscribe from receiving notifications for a specific pending change.
|
|
|
|
```typescript
|
|
await connection.invoke("UnsubscribeFromPendingChange", pendingChangeId);
|
|
```
|
|
|
|
**Parameters:**
|
|
- `pendingChangeId` (Guid/string): The ID of the pending change to unsubscribe from
|
|
|
|
## Notification Payloads
|
|
|
|
### PendingChangeCreatedNotification
|
|
|
|
```json
|
|
{
|
|
"notificationType": "PendingChangeCreated",
|
|
"pendingChangeId": "123e4567-e89b-12d3-a456-426614174000",
|
|
"toolName": "create_epic",
|
|
"entityType": "Epic",
|
|
"operation": "CREATE",
|
|
"summary": "Create Epic: Implement User Authentication",
|
|
"tenantId": "456e7890-e89b-12d3-a456-426614174000",
|
|
"timestamp": "2025-01-15T10:30:00Z"
|
|
}
|
|
```
|
|
|
|
### PendingChangeApprovedNotification
|
|
|
|
```json
|
|
{
|
|
"notificationType": "PendingChangeApproved",
|
|
"pendingChangeId": "123e4567-e89b-12d3-a456-426614174000",
|
|
"toolName": "create_epic",
|
|
"entityType": "Epic",
|
|
"operation": "CREATE",
|
|
"entityId": "789e0123-e89b-12d3-a456-426614174000",
|
|
"approvedBy": "234e5678-e89b-12d3-a456-426614174000",
|
|
"executionResult": "Epic created: 789e0123-e89b-12d3-a456-426614174000 - Implement User Authentication",
|
|
"tenantId": "456e7890-e89b-12d3-a456-426614174000",
|
|
"timestamp": "2025-01-15T10:31:00Z"
|
|
}
|
|
```
|
|
|
|
### PendingChangeRejectedNotification
|
|
|
|
```json
|
|
{
|
|
"notificationType": "PendingChangeRejected",
|
|
"pendingChangeId": "123e4567-e89b-12d3-a456-426614174000",
|
|
"toolName": "create_epic",
|
|
"reason": "Epic name is too vague, please provide more details",
|
|
"rejectedBy": "234e5678-e89b-12d3-a456-426614174000",
|
|
"tenantId": "456e7890-e89b-12d3-a456-426614174000",
|
|
"timestamp": "2025-01-15T10:31:00Z"
|
|
}
|
|
```
|
|
|
|
### PendingChangeAppliedNotification
|
|
|
|
```json
|
|
{
|
|
"notificationType": "PendingChangeApplied",
|
|
"pendingChangeId": "123e4567-e89b-12d3-a456-426614174000",
|
|
"toolName": "create_epic",
|
|
"result": "Epic created: 789e0123-e89b-12d3-a456-426614174000 - Implement User Authentication",
|
|
"appliedAt": "2025-01-15T10:31:05Z",
|
|
"tenantId": "456e7890-e89b-12d3-a456-426614174000",
|
|
"timestamp": "2025-01-15T10:31:05Z"
|
|
}
|
|
```
|
|
|
|
### PendingChangeExpiredNotification
|
|
|
|
```json
|
|
{
|
|
"notificationType": "PendingChangeExpired",
|
|
"pendingChangeId": "123e4567-e89b-12d3-a456-426614174000",
|
|
"toolName": "create_epic",
|
|
"expiredAt": "2025-01-15T22:30:00Z",
|
|
"tenantId": "456e7890-e89b-12d3-a456-426614174000",
|
|
"timestamp": "2025-01-15T22:30:00Z"
|
|
}
|
|
```
|
|
|
|
## Tenant Isolation
|
|
|
|
All notifications are automatically isolated by tenant. When you connect, you are automatically added to your tenant's group, and you will only receive notifications for PendingChanges belonging to your tenant.
|
|
|
|
## Connection Lifecycle
|
|
|
|
1. **Connect**: Establish WebSocket connection with JWT token
|
|
2. **Automatic Tenant Group Join**: Server automatically adds you to your tenant's group
|
|
3. **Subscribe**: Optionally subscribe to specific pending changes
|
|
4. **Receive Notifications**: Get real-time updates
|
|
5. **Unsubscribe**: Optionally unsubscribe from specific pending changes
|
|
6. **Disconnect**: Close connection when done
|
|
|
|
## Error Handling
|
|
|
|
```typescript
|
|
connection.onreconnecting((error) => {
|
|
console.warn("SignalR Reconnecting:", error);
|
|
});
|
|
|
|
connection.onreconnected((connectionId) => {
|
|
console.log("SignalR Reconnected:", connectionId);
|
|
});
|
|
|
|
connection.onclose((error) => {
|
|
console.error("SignalR Connection Closed:", error);
|
|
// Implement custom reconnection logic if needed
|
|
});
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Use Automatic Reconnect**: Always enable `.withAutomaticReconnect()` to handle temporary disconnections
|
|
2. **Subscribe Selectively**: Only subscribe to specific pending changes you care about
|
|
3. **Clean Up**: Always unsubscribe and close connections when done
|
|
4. **Error Handling**: Implement proper error handling for network failures
|
|
5. **Token Refresh**: Refresh JWT tokens before they expire to maintain connection
|
|
6. **Logging**: Enable appropriate logging level for debugging
|
|
|
|
## Fallback Strategy
|
|
|
|
If SignalR/WebSocket is not available or fails, you can fall back to polling:
|
|
|
|
```typescript
|
|
// Fallback to polling if SignalR fails
|
|
async function pollPendingChange(pendingChangeId: string) {
|
|
const interval = setInterval(async () => {
|
|
try {
|
|
const response = await fetch(
|
|
`https://api.colaflow.com/api/mcp/pending-changes/${pendingChangeId}`,
|
|
{
|
|
headers: {
|
|
Authorization: `Bearer ${jwtToken}`,
|
|
},
|
|
}
|
|
);
|
|
const pendingChange = await response.json();
|
|
|
|
if (pendingChange.status === "Approved" || pendingChange.status === "Rejected") {
|
|
clearInterval(interval);
|
|
console.log("PendingChange status updated:", pendingChange.status);
|
|
}
|
|
} catch (error) {
|
|
console.error("Polling error:", error);
|
|
}
|
|
}, 2000); // Poll every 2 seconds
|
|
}
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
- **Connection Reuse**: Reuse a single SignalR connection for multiple subscriptions
|
|
- **Selective Subscriptions**: Only subscribe to pending changes you need to track
|
|
- **Bandwidth**: SignalR uses WebSocket, which is much more efficient than polling
|
|
- **Scalability**: The server uses SignalR Groups for efficient message routing
|
|
|
|
## Troubleshooting
|
|
|
|
### Connection Fails
|
|
|
|
- Verify JWT token is valid and not expired
|
|
- Check CORS settings if connecting from a browser
|
|
- Ensure WebSocket is not blocked by firewall/proxy
|
|
|
|
### Not Receiving Notifications
|
|
|
|
- Verify you're connected (check `connection.state === HubConnectionState.Connected`)
|
|
- Check tenant ID matches the PendingChange's tenant
|
|
- Verify you've subscribed to the pending change (if using selective subscriptions)
|
|
|
|
### Notifications Delayed
|
|
|
|
- Check network latency
|
|
- Verify server is not under heavy load
|
|
- Check if automatic reconnect is triggering frequently (network instability)
|
|
|
|
## Support
|
|
|
|
For issues or questions, please contact the ColaFlow development team or open an issue in the repository.
|