Files
ColaFlow-Web/lib/signalr/ConnectionManager.ts
Yaojia Wang 048e7e7e6d fix(frontend): Fix SignalR 401 authentication error with dynamic token factory
Fixed SignalR connection failing with 401 Unauthorized error by using
a dynamic token factory instead of a static token value.

Changes:
- Updated accessTokenFactory to call tokenManager.getAccessToken() dynamically
- This ensures SignalR always uses the latest valid JWT token
- Fixes token expiration and refresh issues during connection lifecycle

Issue: SignalR negotiation was failing because it used a stale token
captured at connection creation time, instead of fetching the current
token from localStorage on each request.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 20:45:51 +01:00

172 lines
4.8 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import * as signalR from '@microsoft/signalr';
import { tokenManager } from '@/lib/api/client';
import { SIGNALR_CONFIG } from './config';
import { logger } from '@/lib/utils/logger';
export type ConnectionState =
| 'disconnected'
| 'connecting'
| 'connected'
| 'reconnecting';
export class SignalRConnectionManager {
private connection: signalR.HubConnection | null = null;
private hubUrl: string;
private reconnectAttempt = 0;
private stateListeners: Array<(state: ConnectionState) => void> = [];
constructor(hubUrl: string) {
this.hubUrl = hubUrl;
}
async start(): Promise<void> {
if (
this.connection &&
this.connection.state === signalR.HubConnectionState.Connected
) {
logger.debug('[SignalR] Already connected');
return;
}
const token = tokenManager.getAccessToken();
if (!token) {
logger.warn('[SignalR] No access token found, cannot connect');
return;
}
this.connection = new signalR.HubConnectionBuilder()
.withUrl(this.hubUrl, {
// Use dynamic token factory to always get the latest token
accessTokenFactory: () => tokenManager.getAccessToken() || '',
// 备用方案:使用 query stringWebSocket 升级需要)
// transport: signalR.HttpTransportType.WebSockets,
})
.configureLogging(
signalR.LogLevel[
SIGNALR_CONFIG.LOG_LEVEL as keyof typeof signalR.LogLevel
]
)
.withAutomaticReconnect(SIGNALR_CONFIG.RECONNECT_DELAYS)
.build();
this.setupConnectionHandlers();
try {
this.notifyStateChange('connecting');
await this.connection.start();
logger.info(`[SignalR] Connected to ${this.hubUrl}`);
this.notifyStateChange('connected');
this.reconnectAttempt = 0;
} catch (error) {
logger.error('[SignalR] Connection error', error);
this.notifyStateChange('disconnected');
this.scheduleReconnect();
}
}
async stop(): Promise<void> {
if (this.connection) {
await this.connection.stop();
this.connection = null;
this.notifyStateChange('disconnected');
logger.info('[SignalR] Disconnected');
}
}
on<T = unknown>(methodName: string, callback: (data: T) => void): void {
if (this.connection) {
this.connection.on(methodName, callback);
}
}
off<T = unknown>(methodName: string, callback?: (data: T) => void): void {
if (this.connection && callback) {
this.connection.off(methodName, callback);
} else if (this.connection) {
this.connection.off(methodName);
}
}
async invoke<T = unknown>(methodName: string, ...args: unknown[]): Promise<T> {
if (
!this.connection ||
this.connection.state !== signalR.HubConnectionState.Connected
) {
throw new Error('SignalR connection is not established');
}
return await this.connection.invoke<T>(methodName, ...args);
}
onStateChange(listener: (state: ConnectionState) => void): () => void {
this.stateListeners.push(listener);
// 返回 unsubscribe 函数
return () => {
this.stateListeners = this.stateListeners.filter((l) => l !== listener);
};
}
private setupConnectionHandlers(): void {
if (!this.connection) return;
this.connection.onclose((error) => {
logger.info('[SignalR] Connection closed', error);
this.notifyStateChange('disconnected');
this.scheduleReconnect();
});
this.connection.onreconnecting((error) => {
logger.info('[SignalR] Reconnecting...', error);
this.notifyStateChange('reconnecting');
});
this.connection.onreconnected((connectionId) => {
logger.info('[SignalR] Reconnected', connectionId);
this.notifyStateChange('connected');
this.reconnectAttempt = 0;
});
}
private scheduleReconnect(): void {
if (this.reconnectAttempt >= SIGNALR_CONFIG.RECONNECT_DELAYS.length) {
logger.error('[SignalR] Max reconnect attempts reached');
return;
}
const delay = SIGNALR_CONFIG.RECONNECT_DELAYS[this.reconnectAttempt];
this.reconnectAttempt++;
logger.info(
`[SignalR] Scheduling reconnect in ${delay}ms (attempt ${this.reconnectAttempt})`
);
setTimeout(() => {
this.start();
}, delay);
}
private notifyStateChange(state: ConnectionState): void {
this.stateListeners.forEach((listener) => listener(state));
}
get connectionId(): string | null {
return this.connection?.connectionId ?? null;
}
get state(): ConnectionState {
if (!this.connection) return 'disconnected';
switch (this.connection.state) {
case signalR.HubConnectionState.Connected:
return 'connected';
case signalR.HubConnectionState.Connecting:
return 'connecting';
case signalR.HubConnectionState.Reconnecting:
return 'reconnecting';
default:
return 'disconnected';
}
}
}