Files
ColaFlow/colaflow-api/src/Modules/Mcp/ColaFlow.Modules.Mcp.Domain/Services/TaskLockService.cs
Yaojia Wang 63ff1a9914
Some checks failed
Code Coverage / Generate Coverage Report (push) Has been cancelled
Tests / Run Tests (9.0.x) (push) Has been cancelled
Tests / Docker Build Test (push) Has been cancelled
Tests / Test Summary (push) Has been cancelled
Clean up
2025-11-09 18:40:36 +01:00

298 lines
9.0 KiB
C#

using ColaFlow.Modules.Mcp.Domain.Entities;
using ColaFlow.Modules.Mcp.Domain.Repositories;
namespace ColaFlow.Modules.Mcp.Domain.Services;
/// <summary>
/// Domain service for managing task locks and concurrency control
/// </summary>
public sealed class TaskLockService(ITaskLockRepository taskLockRepository)
{
private readonly ITaskLockRepository _taskLockRepository = taskLockRepository ?? throw new ArgumentNullException(nameof(taskLockRepository));
/// <summary>
/// Try to acquire a lock for a resource
/// Returns the lock if successful, or null if the resource is already locked
/// </summary>
public async Task<TaskLock?> TryAcquireLockAsync(
string resourceType,
Guid resourceId,
string lockHolderType,
Guid lockHolderId,
Guid tenantId,
string? lockHolderName = null,
string? purpose = null,
int expirationMinutes = 5,
CancellationToken cancellationToken = default)
{
// Check if resource is already locked
var existingLock = await _taskLockRepository.GetActiveLockForResourceAsync(
tenantId,
resourceType,
resourceId,
cancellationToken);
if (existingLock != null)
{
// Check if the lock has expired
if (existingLock.IsExpired())
{
// Mark as expired and allow new lock
existingLock.MarkAsExpired();
await _taskLockRepository.UpdateAsync(existingLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
}
else
{
// Resource is locked by someone else
return null;
}
}
// Acquire new lock
var newLock = TaskLock.Acquire(
resourceType: resourceType,
resourceId: resourceId,
lockHolderType: lockHolderType,
lockHolderId: lockHolderId,
tenantId: tenantId,
lockHolderName: lockHolderName,
purpose: purpose,
expirationMinutes: expirationMinutes
);
await _taskLockRepository.AddAsync(newLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
return newLock;
}
/// <summary>
/// Release a lock by ID
/// </summary>
public async Task<bool> ReleaseLockAsync(
Guid lockId,
Guid lockHolderId,
CancellationToken cancellationToken = default)
{
var taskLock = await _taskLockRepository.GetByIdAsync(lockId, cancellationToken);
if (taskLock == null)
return false;
// Verify that the caller is the lock holder
if (taskLock.LockHolderId != lockHolderId)
throw new InvalidOperationException(
"Cannot release lock held by another user/agent");
if (!taskLock.IsValid())
return false; // Lock already released or expired
taskLock.Release();
await _taskLockRepository.UpdateAsync(taskLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
return true;
}
/// <summary>
/// Release a lock for a specific resource
/// </summary>
public async Task<bool> ReleaseLockForResourceAsync(
string resourceType,
Guid resourceId,
Guid lockHolderId,
Guid tenantId,
CancellationToken cancellationToken = default)
{
var taskLock = await _taskLockRepository.GetActiveLockForResourceAsync(
tenantId,
resourceType,
resourceId,
cancellationToken);
if (taskLock == null)
return false;
// Verify that the caller is the lock holder
if (taskLock.LockHolderId != lockHolderId)
throw new InvalidOperationException(
"Cannot release lock held by another user/agent");
if (!taskLock.IsValid())
return false; // Lock already released or expired
taskLock.Release();
await _taskLockRepository.UpdateAsync(taskLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
return true;
}
/// <summary>
/// Check if a resource is currently locked
/// </summary>
public async Task<bool> IsResourceLockedAsync(
Guid tenantId,
string resourceType,
Guid resourceId,
CancellationToken cancellationToken = default)
{
var activeLock = await _taskLockRepository.GetActiveLockForResourceAsync(
tenantId,
resourceType,
resourceId,
cancellationToken);
if (activeLock == null)
return false;
// Check if lock has expired
if (activeLock.IsExpired())
{
// Mark as expired
activeLock.MarkAsExpired();
await _taskLockRepository.UpdateAsync(activeLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
return false;
}
return activeLock.IsValid();
}
/// <summary>
/// Check if a resource is locked by a specific holder
/// </summary>
public async Task<bool> IsResourceLockedByAsync(
Guid tenantId,
string resourceType,
Guid resourceId,
Guid lockHolderId,
CancellationToken cancellationToken = default)
{
var activeLock = await _taskLockRepository.GetActiveLockForResourceAsync(
tenantId,
resourceType,
resourceId,
cancellationToken);
if (activeLock == null)
return false;
return activeLock.IsHeldBy(lockHolderId);
}
/// <summary>
/// Get the current lock for a resource (if any)
/// </summary>
public async Task<TaskLock?> GetActiveLockAsync(
Guid tenantId,
string resourceType,
Guid resourceId,
CancellationToken cancellationToken = default)
{
var activeLock = await _taskLockRepository.GetActiveLockForResourceAsync(
tenantId,
resourceType,
resourceId,
cancellationToken);
if (activeLock == null)
return null;
// Check if lock has expired
if (activeLock.IsExpired())
{
activeLock.MarkAsExpired();
await _taskLockRepository.UpdateAsync(activeLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
return null;
}
return activeLock.IsValid() ? activeLock : null;
}
/// <summary>
/// Extend the expiration time of a lock
/// </summary>
public async Task<bool> ExtendLockAsync(
Guid lockId,
Guid lockHolderId,
int additionalMinutes,
CancellationToken cancellationToken = default)
{
var taskLock = await _taskLockRepository.GetByIdAsync(lockId, cancellationToken);
if (taskLock == null)
return false;
// Verify that the caller is the lock holder
if (taskLock.LockHolderId != lockHolderId)
throw new InvalidOperationException(
"Cannot extend lock held by another user/agent");
if (!taskLock.IsValid())
return false;
taskLock.ExtendExpiration(additionalMinutes);
await _taskLockRepository.UpdateAsync(taskLock, cancellationToken);
await _taskLockRepository.SaveChangesAsync(cancellationToken);
return true;
}
/// <summary>
/// Process expired locks - marks them as expired
/// This should be called by a background job periodically
/// </summary>
public async Task<int> ProcessExpiredLocksAsync(CancellationToken cancellationToken = default)
{
var expiredLocks = await _taskLockRepository.GetExpiredAsync(cancellationToken);
var count = 0;
foreach (var taskLock in expiredLocks)
{
taskLock.MarkAsExpired();
await _taskLockRepository.UpdateAsync(taskLock, cancellationToken);
count++;
}
if (count > 0)
{
await _taskLockRepository.SaveChangesAsync(cancellationToken);
}
return count;
}
/// <summary>
/// Release all locks held by a specific holder
/// Useful when an AI agent disconnects or a user logs out
/// </summary>
public async Task<int> ReleaseAllLocksForHolderAsync(
Guid tenantId,
Guid lockHolderId,
CancellationToken cancellationToken = default)
{
var locks = await _taskLockRepository.GetByLockHolderAsync(
tenantId,
lockHolderId,
cancellationToken);
var count = 0;
foreach (var taskLock in locks.Where(l => l.IsValid()))
{
taskLock.Release();
await _taskLockRepository.UpdateAsync(taskLock, cancellationToken);
count++;
}
if (count > 0)
{
await _taskLockRepository.SaveChangesAsync(cancellationToken);
}
return count;
}
}