Lukas Kolletzki
Lukas Kolletzki

Reputation: 2246

Async waiting for another request without blocking

I have a websocket application which is an OWIN middleware. When a requests comes in, a new instance of the websocket handler is started, which then waits for icoming messages in a loop like this:

var buffer = new byte[1024*64];
Tuple<ArraySegment<byte>, WebSocketMessageType> received;
do
{
    received = await _webSocket.ReceiveMessage(buffer, _cancellationToken.Token);
    if (received.Item1.Count > 0 && someConditionForCallingDoSomething)
    {
        await DoSomething(received.Item1);
    }
    else if(isAnswer)
    {
        QueueAnswer(received.Item1);
    }
} while (received.Item2 != WebSocketMessageType.Close);

The Task returned from _webSocket.ReceiveMessage will complete when data is available.

DoSomething processes its data and then sends something via the websocket connection. Then it should wait for a message via the websocket connection. After processing this message, it should do some work and return (the task). Maybe this little diagram explains it somhow easier:

_______________
| DoSomething |
|-------------|
|    Work  ---------> Send WS message
|             |
|     ??      |
|             |
|  More Work <------- Receive WS message
|   |         |
|   V         |
|  return;    |
|_____________|

Do some work with the data
          |
          V
     Send a message
          |
          V
   Wait for an answer
          |
          V
    Process answer
          |
          V
        finish

What I have tried to wait for an answer:

var answerCancel = new CancellationTokenSource();
answerCancel.CancelAfter(30 * 1000);

var answer = await Task.Run(async () => 
    {
        string tmpAnswer = null;

        while (!_concurrentAnswerDict.TryGetValue(someKey, out tmpAnswer)) {
            await Task.Delay(150, answerCancel.Token);
        }

        return tmpAnswer;
    }, answerCancel.Token);

But this seems to block until the task is cancelled. When I debug the program, I see the call of QueueAnswer after 30 seconds. I thought, Task.Run will run the function in a new thread, but it seems like it does not. From the view that Task.Run is blocking, it seems logical to me that it does not work, because I await the execution of DoSomething, therefore receiving new messages will be blocked too.

My question is: How do I implement a behaviour like this? How do I make DoSomething waiting for another websocket message, before completing?

Thank you in advance for every hint

Lukas

Upvotes: 2

Views: 1650

Answers (2)

user12933372
user12933372

Reputation:

You should use signal r and receive the response on the client side. Or better yet send the request in the client side and receive it on the client side.

Using await will block as you are sending a request to the server and waits for a response.

Upvotes: 0

Stephen Cleary
Stephen Cleary

Reputation: 457207

First off, I recommend using SignalR, since they handle a lot of this hard stuff for you. But if you want to do it yourself, read on...

Also, I'm assuming that "do work" and "answer" messages can arrive on the same web socket in any order, and that you're using _concurrentAnswerDict to coordinate an outgoing "question" message from DoSomething with incoming "answer" messages.

In that case, you're going to need a "websocket reader" task that is independent of DoSomething; you can't have your reader await DoSomething because that will prevent reading answers. I think that's the main problem you're having.

This is one of the rare cases where it may be acceptable to not await a task. Assuming DoSomething will catch its own exceptions and handle logging and whatnot, then we can treat it as an independent "main" and ignore the task it returns:

var buffer = new byte[1024*64];
Tuple<ArraySegment<byte>, WebSocketMessageType> received;
do
{
  received = await _webSocket.ReceiveMessage(buffer, _cancellationToken.Token);
  if (received.Item1.Count > 0 && someConditionForCallingDoSomething)
  {
    var _ = DoSomething(received.Item1);
  }
  else if(isAnswer)
  {
    QueueAnswer(received.Item1);
  }
} while (received.Item2 != WebSocketMessageType.Close);

And that should allow QueueAnswer to run while DoSomething is not yet completed.

I thought, Task.Run will run the function in a new thread, but it seems like it does not. From the view that Task.Run is blocking, it seems logical to me that it does not work, because I await the execution of DoSomething, therefore receiving new messages will be blocked too.

Task.Run is running in another thread. But DoSomething is (asynchronously) waiting for it to complete, and the reading loop is (asynchronously) waiting for DoSomething to complete before it reads the next message.

Other notes:

while (!_concurrentAnswerDict.TryGetValue(someKey, out tmpAnswer)) {
  await Task.Delay(150, answerCancel.Token);
}

This seems quite odd to me. I would recommend using a dictionary of keys to TaskCompletionSource<Answer> instead of Answer. Then, QueueAnswer would call TaskCompletionSource<Answer>.SetResult, and this code would just await TaskCompletionSource<Answer>.Task (along with Task.Delay if a timeout is desired).

Upvotes: 2

Related Questions