Skip to content

Commit

Permalink
Add a mechanism to respond with binary data in ack message
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-Sindo committed Jan 2, 2025
1 parent 2df6718 commit edd412c
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public virtual Task HandlePingAsync(PingMessage pingMessage)

public void HandleAck(AckMessage ackMessage)
{
_ackHandler.TriggerAck(ackMessage.AckId, (AckStatus)ackMessage.Status);
_ackHandler.TriggerAck(ackMessage.AckId, (AckStatus)ackMessage.Status, ackMessage.Payload);
}

public virtual Task WriteAsync(ServiceMessage serviceMessage)
Expand Down Expand Up @@ -249,6 +249,29 @@ public async Task<bool> WriteAckableMessageAsync(ServiceMessage serviceMessage,
return AckHandler.HandleAckStatus(ackableMessage, status);
}

/// <summary>
/// <see cref="WriteAckableMessageAsync(ServiceMessage, CancellationToken)"/> only checks <see cref="AckMessage.Status"/> as the response,
/// while this method checks <see cref="AckMessage.Payload"/> and deserialize it to <typeparamref name="T"/>.
/// </summary>
public async Task<T> InvokeAsync<T>(ServiceMessage serviceMessage, CancellationToken cancellationToken = default) where T : IMessagePackSerializable<T>, new()
{
if (serviceMessage is not IAckableMessage ackableMessage)
{
throw new ArgumentException($"{nameof(serviceMessage)} is not {nameof(IAckableMessage)}");
}

var task = _ackHandler.CreateSingleAck<T>(out var id, null, cancellationToken);
ackableMessage.AckId = id;

// Sending regular messages completes as soon as the data leaves the outbound pipe,
// whereas ackable ones complete upon full roundtrip of the message and the ack (or timeout).
// Therefore sending them over different connections creates a possibility for processing them out of original order.
// By sending both message types over the same connection we ensure that they are sent (and processed) in their original order.
await WriteMessageAsync(serviceMessage);

return await task;
}

public virtual Task OfflineAsync(GracefulShutdownMode mode, CancellationToken token)
{
_terminated = true;
Expand Down
67 changes: 49 additions & 18 deletions src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
using System;
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Microsoft.Azure.SignalR.Common;
using Microsoft.Azure.SignalR.Protocol;

Expand Down Expand Up @@ -35,7 +40,7 @@ public Task<AckStatus> CreateSingleAck(out int id, TimeSpan? ackTimeout = defaul
{
return Task.FromResult(AckStatus.Ok);
}
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new SingleAckInfo(ackTimeout ?? _defaultAckTimeout));
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new SingleAckInfo<AckStatus>(ackTimeout ?? _defaultAckTimeout));
if (info is MultiAckInfo)
{
throw new InvalidOperationException();
Expand All @@ -44,6 +49,18 @@ public Task<AckStatus> CreateSingleAck(out int id, TimeSpan? ackTimeout = defaul
return info.Task;
}

public Task<T> CreateSingleAck<T>(out int id, TimeSpan? ackTimeout = default, CancellationToken cancellationToken = default) where T : IMessagePackSerializable, new()
{
id = NextId();
if (_disposed)
{
return Task.FromResult(new T());
}
var info = (IAckInfo<IMessagePackSerializable>)_acks.GetOrAdd(id, _ => new MessagePackPayloadSingleAckInfo<T>(ackTimeout ?? _defaultAckTimeout));
cancellationToken.Register(info.Cancel);
return info.Task.ContinueWith(task => (T)task.Result);
}

public static bool HandleAckStatus(IAckableMessage message, AckStatus status)
{
return status switch
Expand All @@ -63,27 +80,30 @@ public Task<AckStatus> CreateMultiAck(out int id, TimeSpan? ackTimeout = default
return Task.FromResult(AckStatus.Ok);
}
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new MultiAckInfo(ackTimeout ?? _defaultAckTimeout));
if (info is SingleAckInfo)
if (info is SingleAckInfo<AckStatus>)
{
throw new InvalidOperationException();
}
return info.Task;
}

public void TriggerAck(int id, AckStatus status = AckStatus.Ok)
public void TriggerAck(int id, AckStatus status = AckStatus.Ok, ReadOnlySequence<byte>? payload = default)
{
if (_acks.TryGetValue(id, out var info))
{
switch (info)
if (info is IAckInfo<AckStatus> statusAck)
{
case IAckInfo<AckStatus> ackInfo:
if (ackInfo.Ack(status))
{
_acks.TryRemove(id, out _);
}
break;
default:
throw new InvalidCastException($"Expected: IAckInfo<{typeof(IAckInfo<AckStatus>).Name}>, actual type: {info.GetType().Name}");
if (statusAck.Ack(status))
{
_acks.TryRemove(id, out _);
}
}
else if (info is )

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_ubuntu (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (8.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_osx (6.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing

Check failure on line 101 in src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

View workflow job for this annotation

GitHub Actions / build_windows (7.0.x)

Pattern missing
{
}
else
{
throw new InvalidCastException($"Expected: IAckInfo<{typeof(IAckInfo<AckStatus>).Name}>, actual type: {info.GetType().Name}");
}
}
}
Expand Down Expand Up @@ -125,7 +145,7 @@ private void CheckAcks()
{
if (_acks.TryRemove(id, out _))
{
if (ack is SingleAckInfo singleAckInfo)
if (ack is SingleAckInfo<AckStatus> singleAckInfo)
{
singleAckInfo.Ack(AckStatus.Timeout);
}
Expand Down Expand Up @@ -183,9 +203,9 @@ public interface IMultiAckInfo
bool SetExpectedCount(int expectedCount);
}

private sealed class SingleAckInfo : IAckInfo<AckStatus>
private class SingleAckInfo<T> : IAckInfo<T>
{
public readonly TaskCompletionSource<AckStatus> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
public readonly TaskCompletionSource<T> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);

public DateTime TimeoutAt { get; }

Expand All @@ -194,14 +214,25 @@ public SingleAckInfo(TimeSpan timeout)
TimeoutAt = DateTime.UtcNow + timeout;
}

public bool Ack(AckStatus status = AckStatus.Ok) =>
public bool Ack(T status) =>
_tcs.TrySetResult(status);

public Task<AckStatus> Task => _tcs.Task;
public Task<T> Task => _tcs.Task;

public void Cancel() => _tcs.TrySetCanceled();
}

private sealed class MessagePackPayloadSingleAckInfo<T> : SingleAckInfo<IMessagePackSerializable> where T : IMessagePackSerializable, new()
{
public MessagePackPayloadSingleAckInfo(TimeSpan timeout) : base(timeout) { }
public bool Ack(ReadOnlySequence<byte> payload)
{
var reader = new MessagePackReader(payload);
var result = reader.Deserialize<T>(string.Empty);
return _tcs.TrySetResult(result);
}
}

private sealed class MultiAckInfo : IAckInfo<AckStatus>, IMultiAckInfo
{
public readonly TaskCompletionSource<AckStatus> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Azure.SignalR.Protocol;

namespace Microsoft.Azure.SignalR.Management
{
Expand All @@ -14,5 +17,7 @@ public abstract class GroupManager : IGroupManager
public abstract Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default);

public abstract Task RemoveFromAllGroupsAsync(string connectionId, CancellationToken cancellationToken = default);

public virtual IAsyncEnumerable<GroupMember> ListConnectionsInGroup(string groupName, int? max, CancellationToken token) => throw new NotImplementedException();
}
}
Loading

0 comments on commit edd412c

Please sign in to comment.