LorneCash
LorneCash

Reputation: 1614

How to convert this WebSocket pattern to a Task I can await/cancel/continuewith/

I have 3rd party WebSocket that I need to consume where the request is sent in one method and the response is given in another. How can I convert this type of pattern to a more typical async/await TPL Task that will support cancellation (via token), continue with and all the other goodies.

This is what I came up with so far, although I'm not sure if it works. I can't test it until Monday.

So here are my questions:

  1. Will this work?

  2. I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

  3. I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

  4. Even with the lock I know that if there is a timeout or cancellation that creates a problem so maybe I just remove the cancellation token. The only other thing I can think of would be to create multiple clients that are all authenticated and ready to process a request and then manage them like a thread pool for these types of request but I'm not going to be doing that anytime soon so other than that... idk.

    private object GetPositionsLock = new object();
    private IEnumerable<Position> Positions { get; } = new List<Position>();
    private Task PositionsReturned { get; set; }
    public async Task<List<Position>> AsyncGetPositions(CancellationToken token)
    {
        try
        {
            lock (GetPositionsLock)
            {
                Positions.Clear();
                IbWebSocket.reqPositions();
                PositionsReturned = new Task(null, token, TaskCreationOptions.None);
                PositionsReturned.GetAwaiter().GetResult();
                return token.IsCancellationRequested ? null : Positions.ToList().Where(x => x.Shares != 0).ToList();
            }
        }
        catch (TimeoutException ex)
        {
            //LogTimeout
            throw;
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    ///  <summary>
    ///         Provides a position to the reqPositions() method.  When the last position has been received positionEnd() is called.
    ///  </summary>
    ///  <param name="contract"></param>
    ///  <param name="value"></param>
    ///  <param name="account"></param>
    public void position(string account, Contract contract, double value)
    {
        try
        {
            Positions.Concat(new Position(account, contract, value));
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    /// <summary>
    ///     Indicates all the positions have been transmitted.
    /// </summary>
    public void positionEnd()
    {
        PositionsReturned = Task.CompletedTask;
    }
    

Upvotes: 0

Views: 178

Answers (1)

Stephen Cleary
Stephen Cleary

Reputation: 457207

Will this work?

No. You shouldn't use the Task constructor, use lock with async code, or mix blocking with asynchronous code.

I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

Yes, that's the type to use for this scenario.

I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

I recommend getting this working first, and then handling the additional requirement of reentrancy. Each of those are hard enough on their own.


What you want to do is have a TaskCompletionSource<T> and complete it when positionEnd is invoked. For simplicity, start without reentrancy concerns and without the CancellationToken. Once you fully understand TaskCompletionSource<T>, then you can add complexity:

private List<Position> Positions { get; } = new();
private TaskCompletionSource<List<Position>> PositionsReturned { get; set; }
public Task<List<Position>> AsyncGetPositions()
{
  Positions.Clear();
  PositionsReturned = new();
  IbWebSocket.reqPositions();
  return PositionsReturned.Task;
}

public void position(string account, Contract contract, double value)
{
  Positions.Add(new Position(account, contract, value));
}

public void positionEnd()
{
  PositionsReturned.TrySetResult(Positions);
}

Upvotes: 2

Related Questions