Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Task StopGetServersPing()
return Task.WhenAll(GetConnections().Select(s => s.StopGetServersPing()));
}

public IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
public IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ internal interface IServiceMessageWriter

Task<bool> WriteAckableMessageAsync(ServiceMessage serviceMessage, CancellationToken cancellationToken = default);

IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string? continuationToken = null, ulong? tracingId = null, CancellationToken token = default);
IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string? continuationToken = null, ulong? tracingId = null, CancellationToken token = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Text.Json.Serialization;

namespace Microsoft.Azure.SignalR;

#nullable enable

// TODO: make public later
internal sealed record SignalRGroupConnection
{
[JsonPropertyName("connectionId")]
public string ConnectionId { internal set; get; }

[JsonPropertyName("userId")]
public string? UserId { get; internal set; }

public SignalRGroupConnection(string connectionId, string? userId = default)
{
ConnectionId = connectionId;
UserId = userId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,27 @@

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

using Azure;

using Microsoft.Azure.SignalR.Protocol;

#nullable enable

namespace Microsoft.Azure.SignalR;

internal class GroupMemberQueryResultPage : Page<GroupMember>
[JsonConverter(typeof(GroupMemberQueryResultPageConverter))]
internal class GroupMemberQueryResultPage : Page<SignalRGroupConnection>
{
private readonly IReadOnlyList<GroupMember> _value;
private readonly string? _continuationToken;

public GroupMemberQueryResultPage(IReadOnlyList<GroupMember> value, string? continuationToken)
public GroupMemberQueryResultPage(IReadOnlyList<SignalRGroupConnection> values, string? continuationToken)
{
_value = value;
_continuationToken = continuationToken;
Values = values ?? throw new ArgumentNullException(nameof(values));
ContinuationToken = continuationToken;
}

public override IReadOnlyList<GroupMember> Values => _value;
public override IReadOnlyList<SignalRGroupConnection> Values { get; }

public override string? ContinuationToken => _continuationToken;
public override string? ContinuationToken { get; }

public override Response GetRawResponse()
{
Expand All @@ -34,3 +32,58 @@ public override Response GetRawResponse()
throw new NotSupportedException();
}
}

internal class GroupMemberQueryResultPageConverter : JsonConverter<GroupMemberQueryResultPage>
{
public override GroupMemberQueryResultPage Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.StartObject)
{
throw new JsonException("Expected StartObject token");
}

IReadOnlyList<SignalRGroupConnection>? values = null;
string? continuationToken = null;

while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndObject)
{
break;
}

if (reader.TokenType != JsonTokenType.PropertyName)
{
throw new JsonException("Expected PropertyName token");
}

string propertyName = reader.GetString()!;
reader.Read();

switch (propertyName)
{
case "value":
values = JsonSerializer.Deserialize<List<SignalRGroupConnection>>(ref reader, options);
break;
case "nextlink":
continuationToken = JsonSerializer.Deserialize<string>(ref reader, options);
break;
default:
reader.Skip();
break;
}
}

return new GroupMemberQueryResultPage(values ?? new List<SignalRGroupConnection>(), continuationToken);
}

public override void Write(Utf8JsonWriter writer, GroupMemberQueryResultPage value, JsonSerializerOptions options)
{
writer.WriteStartObject();
writer.WritePropertyName("value");
JsonSerializer.Serialize(writer, value.Values, options);
writer.WritePropertyName("nextLink");
JsonSerializer.Serialize(writer, value.ContinuationToken, options);
writer.WriteEndObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@

using Azure;

using Microsoft.Azure.SignalR.Protocol;

namespace Microsoft.Azure.SignalR;

#nullable enable

internal class PagenableGroupMember : AsyncPageable<GroupMember>
internal class PageableGroupMember : AsyncPageable<SignalRGroupConnection>
{
// (string? continuationToken, int? pageSizeHint) => IAsyncEnumerable<Page<GroupMember>>
private readonly Func<string?, int?, IAsyncEnumerable<Page<GroupMember>>> _fetchPages;
private readonly Func<string?, int?, IAsyncEnumerable<Page<SignalRGroupConnection>>> _fetchPages;

public PagenableGroupMember(Func<string?, int?, IAsyncEnumerable<Page<GroupMember>>> fetchPages, CancellationToken cancellationToken = default): base(cancellationToken)
public PageableGroupMember(Func<string?, int?, IAsyncEnumerable<Page<SignalRGroupConnection>>> fetchPages, CancellationToken cancellationToken = default) : base(cancellationToken)
{
_fetchPages = fetchPages;
}

public override IAsyncEnumerable<Page<GroupMember>> AsPages(string? continuationToken = null, int? pageSizeHint = null)
public override IAsyncEnumerable<Page<SignalRGroupConnection>> AsPages(string? continuationToken = null, int? pageSizeHint = null)
{
return _fetchPages(continuationToken, pageSizeHint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private async Task WriteSingleEndpointMessageAsync(HubServiceEndpoint endpoint,
}
}

public async IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, [EnumeratorCancellation] CancellationToken token = default)
public async IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, [EnumeratorCancellation] CancellationToken token = default)
{
if (TargetEndpoints.Length == 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public Task<bool> WriteAckableMessageAsync(ServiceMessage serviceMessage, Cancel
return CreateMessageWriter(serviceMessage).WriteAckableMessageAsync(serviceMessage, cancellationToken);
}

public IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
public IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
{
var targetEndpoints = _routerEndpoints.needRouter ? _router.GetEndpointsForGroup(groupName, _routerEndpoints.endpoints) : _routerEndpoints.endpoints;
var messageWriter = new MultiEndpointMessageWriter(targetEndpoints?.ToList(), _loggerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public async Task<bool> WriteAckableMessageAsync(ServiceMessage serviceMessage,
return AckHandler.HandleAckStatus(ackableMessage, status);
}

public async IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, [EnumeratorCancellation] CancellationToken token = default)
public async IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, [EnumeratorCancellation] CancellationToken token = default)
{
if (string.IsNullOrWhiteSpace(groupName))
{
Expand All @@ -273,7 +273,7 @@ public async IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(str
do
{
var response = await InvokeAsync<GroupMemberQueryResponse>(message, token);
yield return new GroupMemberQueryResultPage(response.Members.ToList(), response.ContinuationToken);
yield return new GroupMemberQueryResultPage([.. response.Members.Select(m => new SignalRGroupConnection(m.ConnectionId, m.UserId))], response.ContinuationToken);

if (response.ContinuationToken == null)
{
Expand All @@ -282,6 +282,10 @@ public async IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(str
if (message.Top != null)
{
message.Top -= response.Members.Count;
if (message.Top <= 0)
{
yield break;
}
}
message.ContinuationToken = response.ContinuationToken;
} while (true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using Azure;

using Microsoft.AspNetCore.SignalR;
using Microsoft.Azure.SignalR.Protocol;

namespace Microsoft.Azure.SignalR.Management
{
Expand Down Expand Up @@ -50,6 +49,6 @@ public abstract class GroupManager : IGroupManager
/// <param name="top">The maximum number of connections to return.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An asynchronous enumerable of group members.</returns>
internal virtual AsyncPageable<GroupMember> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
internal virtual AsyncPageable<SignalRGroupConnection> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

using Azure;

using Microsoft.Azure.SignalR.Protocol;

namespace Microsoft.Azure.SignalR.Management
{
internal class GroupManagerAdapter : GroupManager
Expand All @@ -26,7 +24,7 @@ public GroupManagerAdapter(IServiceHubLifetimeManager lifetimeManager)

public override Task RemoveFromAllGroupsAsync(string connectionId, CancellationToken cancellationToken = default) => _lifetimeManager.RemoveFromAllGroupsAsync(connectionId, cancellationToken);

internal override AsyncPageable<GroupMember> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken cancellationToken = default)
internal override AsyncPageable<SignalRGroupConnection> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(groupName))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ internal interface IServiceHubLifetimeManager : IHubLifetimeManager, IUserGroupH

Task<bool> GroupExistsAsync(string groupName, CancellationToken cancellationToken);

AsyncPageable<GroupMember> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken token = default);
AsyncPageable<SignalRGroupConnection> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken token = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private static bool FilterExpectedResponse(HttpResponseMessage response, string
response.IsSuccessStatusCode
|| (response.StatusCode == HttpStatusCode.NotFound && response.Headers.TryGetValues(Headers.MicrosoftErrorCode, out var errorCodes) && errorCodes.First().Equals(expectedErrorCode, StringComparison.OrdinalIgnoreCase));

public AsyncPageable<GroupMember> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken token = default)
public AsyncPageable<SignalRGroupConnection> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken token = default)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public override async Task<T> InvokeConnectionAsync<T>(string connectionId, stri

using var cts = new CancellationTokenSource(DefaultInvocationTimeoutTimespan);
var cancellationTokenInUse = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken).Token;

var invocationId = _clientInvocationManager.Caller.GenerateInvocationId(connectionId);
var message = AppendMessageTracingId(new ClientInvocationMessage(invocationId, connectionId, _callerId, SerializeAllProtocols(methodName, args, invocationId)));
await WriteAsync(message);
Expand Down Expand Up @@ -310,7 +310,7 @@ protected override T AppendMessageTracingId<T>(T message)
return base.AppendMessageTracingId(message);
}

public AsyncPageable<GroupMember> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken token = default)
public AsyncPageable<SignalRGroupConnection> ListConnectionsInGroup(string groupName, int? top = null, CancellationToken token = default)
{
if (string.IsNullOrEmpty(groupName))
{
Expand All @@ -323,6 +323,6 @@ public AsyncPageable<GroupMember> ListConnectionsInGroup(string groupName, int?
}

ulong? tracingId = _serviceManagerOptions.Value.EnableMessageTracing ? MessageWithTracingIdHelper.Generate() : null;
return new PagenableGroupMember((string? continuationToken, int? pageSize) => ServiceConnectionContainer.ListConnectionsInGroupAsync(groupName, top, pageSize, continuationToken, tracingId, token), token);
return new PageableGroupMember((string? continuationToken, int? pageSize) => ServiceConnectionContainer.ListConnectionsInGroupAsync(groupName, top, pageSize, continuationToken, tracingId, token), token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void Dispose()
StopAsync().GetAwaiter().GetResult();
}

public IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string? continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
public IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string? continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
{
if (_serviceConnection == null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Text.Json;
using Xunit;

namespace Microsoft.Azure.SignalR.Tests
{
public class GroupMemberQueryResultPageTests
{
[Fact]
public void CanDeserializeGroupMemberQueryResultPage()
{
// Arrange
var json = @"
{
""value"": [
{ ""connectionId"": ""conn1"", ""userId"": ""user1"" },
{ ""connectionId"": ""conn2"", ""userId"": null }
],
""nextLink"": ""token123""
}";

// Act
var result = JsonSerializer.Deserialize<GroupMemberQueryResultPage>(json);

// Assert
Assert.NotNull(result);
Assert.NotNull(result.Values);
Assert.Equal(2, result.Values.Count);
Assert.Equal("conn1", result.Values[0].ConnectionId);
Assert.Equal("user1", result.Values[0].UserId);
Assert.Equal("conn2", result.Values[1].ConnectionId);
Assert.Null(result.Values[1].UserId);
Assert.Equal("token123", result.ContinuationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

using Azure;

using Microsoft.Azure.SignalR.Protocol;
using Microsoft.Azure.SignalR.Tests.Common;
using Microsoft.Extensions.Logging;

Expand All @@ -31,10 +30,10 @@ public async Task ListConnectionsInGroup(int? top, int resultCount, int expected
for (var i = 0; i < 2; i++)
{
var endpoint = new TestHubServiceEndpoint();
var resultFromConnectioContainer = MockAsyncEnumerable<GroupMember>.From(new GroupMemberQueryResultPage([
new GroupMember { ConnectionId = "1" },
new GroupMember { ConnectionId = "2" },
new GroupMember { ConnectionId = "3" }],
var resultFromConnectioContainer = MockAsyncEnumerable<SignalRGroupConnection>.From(new GroupMemberQueryResultPage([
new SignalRGroupConnection("1"),
new SignalRGroupConnection("2"),
new SignalRGroupConnection("3")],
null)
);
var containerMock = new Mock<IServiceConnectionContainer>();
Expand All @@ -45,7 +44,7 @@ public async Task ListConnectionsInGroup(int? top, int resultCount, int expected
targetEndpoints.Add(endpoint);
}
var multiEndpointWriter = new MultiEndpointMessageWriter(targetEndpoints, Mock.Of<ILoggerFactory>());
var resultPages = new List<Page<GroupMember>>();
var resultPages = new List<Page<SignalRGroupConnection>>();
await foreach (var page in multiEndpointWriter.ListConnectionsInGroupAsync("group", top))
{
resultPages.Add(page);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ public Task CloseClientConnections(CancellationToken token)
throw new NotImplementedException();
}

public IAsyncEnumerable<GroupMember> ListConnectionsInGroupAsync(string groupName, int? top = null, ulong? tracingId = null)
public IAsyncEnumerable<SignalRGroupConnection> ListConnectionsInGroupAsync(string groupName, int? top = null, ulong? tracingId = null)
{
throw new NotImplementedException();
}

public IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
public IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Task OfflineAsync(GracefulShutdownMode mode, CancellationToken token)

public Task CloseClientConnections(CancellationToken token) => Task.CompletedTask;

public IAsyncEnumerable<Page<GroupMember>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
public IAsyncEnumerable<Page<SignalRGroupConnection>> ListConnectionsInGroupAsync(string groupName, int? top = null, int? maxPageSize = null, string continuationToken = null, ulong? tracingId = null, CancellationToken token = default)
{
throw new NotImplementedException();
}
Expand Down
Loading
Loading