deathrace
deathrace

Reputation: 1076

Azure PuSub with protobuf data not working

I am using generic WebSocket client (System.Net.WebSockets) to connect to Azure Web PubSub service and communicate between two parties. I referred this official document. I was able to communicate using json but with protobuf I don't receive any data. Below is the sample code I am using:

EDIT: client is receiving some message (after changing class type to 'DownstreamMessage') but it says: disconnectedMessage": { "reason": "Invalid payload. Invalid value : '': Invalid event name. Valid event name should be word in between 1 and 128 characters long. I don't understand whats happening

using Google.Protobuf;

// Create a joining message
UpstreamMessage joiningMessage = new()
{
    JoinGroupMessage = new UpstreamMessage.Types.JoinGroupMessage()
    {
        Group = Group,
    }
};
var joinData = joiningMessage.ToByteArray();

// Initialize the WebSocket
ClientWebSocket webSocket = new();
webSocket.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
await webSocket.ConnectAsync(new Uri(connStr), CancellationToken.None);

// Set up a continuous message receiver
_ = Task.Run(async () =>
{
    while (webSocket.State == WebSocketState.Open)
    {
        var buffer = new byte[8192];
        var result = await webSocket.ReceiveAsync(
                                    new ArraySegment<byte>(buffer),
                                    CancellationToken.None);

        if (result.MessageType == WebSocketMessageType.Binary)
        {
            // Create a properly sized array containing only the actual message
            var messageBytes = new byte[result.Count];
            Array.Copy(buffer, 0, messageBytes, 0, result.Count);

            try
            {
                var msg = DownstreamMessage.Parser.ParseFrom(messageBytes);
                Console.WriteLine($"Received message: {msg}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error parsing message: {ex.Message}");
            }
        }
        else if (result.MessageType == WebSocketMessageType.Close)
        {
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
            break;
        }
    }
});

// Wait for connection establishment
await Task.Delay(2000);

// Send join request
await webSocket.SendAsync(joinData, WebSocketMessageType.Binary, true, CancellationToken.None);
Console.WriteLine("Join group message sent");

// Wait for join to be processed
await Task.Delay(1000);

// Create and send data message
UpstreamMessage dataMessage = new()
{
    // Set the EventMessage with proper target group
    EventMessage = new UpstreamMessage.Types.EventMessage()
    {
        Data = new MessageData()
        {
            TextData = "Hello from protobuf dheeraj"
        }
    }
};

await webSocket.SendAsync(dataMessage.ToByteArray(), WebSocketMessageType.Binary, true, CancellationToken.None);
Console.WriteLine("Data message sent");

Upvotes: 0

Views: 37

Answers (1)

Sampath
Sampath

Reputation: 3639

The error you encountered, Invalid event name, indicates that the Protobuf message structure was incorrect. I have ensured that EventMessage properly sets the Data field.

Use BinaryData.FromBytes(message.ToByteArray()) to serialize messages like below.

await serviceClient.SendToGroupAsync(Group, BinaryData.FromString(message), WebPubSubDataType.Binary);

Below is the sample code for Azure PubSub with protobuf:

    private const string Group = "protobuf-client-group";
    private const string Uri = "wss://"; 
    static async Task Main()
    {
        var serviceClient = new WebPubSubClient(new Uri(Uri), new WebPubSubClientOptions
        {
            Protocol = new WebPubSubProtobufProtocol()
        });

        serviceClient.Connected += arg =>
        {
            Console.WriteLine($"Connected with connection id: {arg.ConnectionId}");
            return Task.CompletedTask;
        };

        serviceClient.Disconnected += arg =>
        {
            Console.WriteLine($"Disconnected from connection id: {arg.ConnectionId}");
            return Task.CompletedTask;
        };

        serviceClient.GroupMessageReceived += arg =>
        {
            Console.WriteLine($"Received Protobuf message: {arg.Message.Data}");
            return Task.CompletedTask;
        };
        await serviceClient.StartAsync();
        await serviceClient.JoinGroupAsync(Group);
        UpstreamMessage joiningMessage = new()
        {
            JoinGroupMessage = new Webpubsub.JoinGroupMessage()
            {
                Group = Group,
            }
        };
        await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(joiningMessage.ToByteArray()), WebPubSubDataType.Binary);

        Console.WriteLine("Join group message sent");
        await Task.Delay(1000);

        while (true)
        {
            Console.WriteLine("Enter the message to send or just press enter to stop:");
            var message = Console.ReadLine();

            if (!string.IsNullOrEmpty(message))
            {
                UpstreamMessage dataMessage = new()
                {
                    EventMessage = new EventMessage()
                    {
                        Data = new MessageData()
                        {
                            TextData = message
                        }
                    }
                };
                await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(dataMessage.ToByteArray()), WebPubSubDataType.Binary);
                Console.WriteLine("Data message sent");
            }
            else
            {
                await serviceClient.LeaveGroupAsync(Group);
                await serviceClient.StopAsync();
                break;
            }
        }
    }

enter image description here Output:

Output

Upvotes: 1

Related Questions