Reputation: 6321
This is a request-response model (like HTTP) but over sockets/websockets. We know which response corresponds to which request by comparing the request IDs.
The workflow is as following:
Error
, ContractDetails
(_itemObservable
) and ContractDetailsEnd
(_itemEndObservable
).OnNext(...)
to ContractDetails
. When there is nothing else left to push, push a final message to ContractDetailsEnd
which essentially does .OnCompleted
.In the first snippet below I use a List<TItemArgs>
, CancellationTokenSource
and a TaskCompletionSource<TItemArgs>
. That's completely unnecessary if everything was Rx style. It would have definitely been less lines of code too.
The second snippet is my personal attempt to make it look more Rx like. It has some issues that I want to resolve:
.Subscribe
could handle errors.Timeout
didn't work out for me – if the request is not matched within a few seconds, it should return a Result<IEnumerable<TItemArgs>>.FromError(new TimeoutError(...))
_errorSubject
should also return an error such as Result<IEnumerable<TItemArgs>>.FromError(new RemoteError(...))
but .Merge(errorMessages.Any(_ => false))
is not working.In my attempt I use ReplaySubject
in opposed to AsyncSubject
because I believe AsyncSubject is only useful when I'm only interested in the last value of the sequence and want to avoid getting all previous values, which is not in my case. In my case I want to return all values, so ReplaySubject would be more suitable as it keeps track of all the previous values and wants all subscribers to receive the same values, regardless of when they subscribed.
public async ValueTask<Result<IEnumerable<TItemArgs>>> ExecuteAsync(Action<int> action)
{
var requestId = _client.GetNextRequestId();
var data = new List<TItemArgs>();
var cts = new CancellationTokenSource(_timeout);
var tcs = new TaskCompletionSource<IEnumerable<TItemArgs>>();
cts.Token.Register(() =>
{
tcs.TrySetCanceled();
}, false);
void OnError(ErrorData msg)
{
tcs.SetException(new IBClientException(msg.RequestId, msg.Code, msg.Message, msg.AdvancedOrderRejectJson));
}
void OnDetails(TItemArgs item)
{
data.Add(item);
}
void OnDetailsEnd(TItemListEndArgs item)
{
tcs.TrySetResult(data);
}
var disposable = new CompositeDisposable();
_client.Error
.Where(e => HasRequestId && e.RequestId == requestId)
.Subscribe(OnError)
.DisposeWith(disposable);
_itemObservable
.Where(item => MatchRequest(item, _itemRequestIdExtractor, requestId))
.Subscribe(OnDetails)
.DisposeWith(disposable);
_itemEndObservable
.Where(item => MatchRequest(item, _itemListEndRequestIdExtractor, requestId))
.Subscribe(OnDetailsEnd)
.DisposeWith(disposable);
action(requestId);
try
{
await tcs.Task.ContinueWith(x =>
{
disposable.Dispose();
cts.Dispose();
}, TaskContinuationOptions.RunContinuationsAsynchronously);
return Result<IEnumerable<TItemArgs>>.FromSuccess(tcs.Task.Result);
}
catch (Exception e)
{
return Result<IEnumerable<TItemArgs>>.FromError(new RemoteError(e.Message, null));
}
}
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
var client = new IBClient();
var result = await client.GetContractDetailsAsync();
if (result.Success)
{
foreach (var item in result.Data!)
{
Console.WriteLine($"RequestId: {item.RequestId} | Data: {item.ContractDetails}");
}
}
public sealed class IBClient
{
private int _nextValidId;
private readonly Subject<ErrorData> _errorSubject = new();
public IObservable<ErrorData> Error => _errorSubject.AsObservable();
private readonly Subject<ContractDetailsData> _contractDetailsSubject = new();
public IObservable<ContractDetailsData> ContractDetails => _contractDetailsSubject.AsObservable();
private readonly Subject<RequestEndData> _contractDetailsEndSubject = new();
public IObservable<RequestEndData> ContractDetailsEnd => _contractDetailsEndSubject.AsObservable();
public ValueTask<Result<IEnumerable<ContractDetailsData>>> GetContractDetailsAsync()
{
return new PendingRequest<ContractDetailsData, RequestEndData>(
this,
ContractDetails,
ContractDetailsEnd,
e => e.RequestId,
e => e.RequestId)
.ExecuteAsync(reqId => TestCall(reqId));
}
private void TestCall(int requestId)
{
_contractDetailsSubject.OnNext(new ContractDetailsData(requestId, "hey from test call"));
_contractDetailsSubject.OnNext(new ContractDetailsData(requestId, "hey two"));
// TODO: Errors doesn't seem to work
// _errorSubject.OnNext(new ErrorData(requestId, 123, "Error happened", ""));
_contractDetailsEndSubject.OnNext(new RequestEndData(requestId));
// There shouldn't be matched.
_contractDetailsSubject.OnNext(new ContractDetailsData(123, "fake ones, so we know it works"));
_contractDetailsEndSubject.OnNext(new RequestEndData(123));
}
public int GetNextRequestId()
{
return Interlocked.Increment(ref _nextValidId);
}
}
public sealed class PendingRequest<TItemArgs, TItemListEndArgs>
{
private readonly TimeSpan _timeout = TimeSpan.FromSeconds(2);
private readonly IBClient _client;
private readonly IObservable<TItemArgs> _itemObservable;
private readonly IObservable<TItemListEndArgs> _itemEndObservable;
private readonly Func<TItemArgs, int>? _itemRequestIdExtractor;
private readonly Func<TItemListEndArgs, int>? _itemListEndRequestIdExtractor;
public PendingRequest(
IBClient client,
IObservable<TItemArgs> itemObservable,
IObservable<TItemListEndArgs> itemEndObservable,
Func<TItemArgs, int>? itemRequestIdExtractor = null,
Func<TItemListEndArgs, int>? itemListEndRequestIdExtractor = null)
{
_client = client;
_itemObservable = itemObservable;
_itemEndObservable = itemEndObservable;
_itemRequestIdExtractor = itemRequestIdExtractor;
_itemListEndRequestIdExtractor = itemListEndRequestIdExtractor;
}
private bool HasRequestId => _itemRequestIdExtractor != null && _itemListEndRequestIdExtractor != null;
public async ValueTask<Result<IEnumerable<TItemArgs>>> ExecuteAsync(Action<int> action, IScheduler? scheduler = null)
{
scheduler ??= ImmediateScheduler.Instance;
var requestId = _client.GetNextRequestId();
var results = new ReplaySubject<TItemArgs>();
try
{
var errorMessages = _client.Error
.Where(e => HasRequestId && e.RequestId == requestId);
using (_itemObservable
.Where(item => MatchRequest(item, _itemRequestIdExtractor, requestId))
.ObserveOn(scheduler)
.Subscribe(results))
using (_itemEndObservable
.Any(item => MatchRequest(item, _itemListEndRequestIdExtractor, requestId))
.Merge(errorMessages.Any(_ => false)) // TODO: ???
.ObserveOn(scheduler)
.Subscribe(_ => results.OnCompleted()))
{
action(requestId);
// Don't want an Exception thrown if there result list is empty
await results.DefaultIfEmpty();
return Result<IEnumerable<TItemArgs>>.FromSuccess(results.ToEnumerable());
}
}
catch (Exception ex)
{
return Result<IEnumerable<TItemArgs>>.FromError(new RemoteError(ex.Message, null));
}
}
private bool MatchRequest<T>(T item, Func<T, int>? idExtractor, int id)
{
return !HasRequestId || (idExtractor != null && idExtractor(item) == id);
}
}
public sealed class ContractDetailsData
{
public ContractDetailsData(int requestId, string contractDetails)
{
RequestId = requestId;
ContractDetails = contractDetails;
}
public int RequestId { get; }
public string ContractDetails { get; }
}
public sealed class ErrorData
{
public ErrorData(int requestId, int code, string message, string advancedOrderRejectJson)
{
RequestId = requestId;
Code = code;
Message = message;
AdvancedOrderRejectJson = advancedOrderRejectJson;
}
public int RequestId { get; }
public int Code { get; }
public string Message { get; }
public string AdvancedOrderRejectJson { get; }
}
public sealed class RequestEndData
{
public RequestEndData(int requestId)
{
RequestId = requestId;
}
public int RequestId { get; }
}
public class IBClientException : Exception
{
public IBClientException(int requestId, int errorCode, string message, string advancedOrderRejectJson)
: base(message)
{
RequestId = requestId;
ErrorCode = errorCode;
AdvancedOrderRejectJson = advancedOrderRejectJson;
}
public IBClientException(string err)
: base(err)
{
}
public IBClientException(Exception e)
{
Exception = e;
}
public int RequestId { get; }
public int ErrorCode { get; }
public string? AdvancedOrderRejectJson { get; }
public Exception? Exception { get; }
}
public abstract record Error(int? Code, string Message, object? Data);
public record RemoteError : Error
{
public RemoteError(string message, object? data) : base(null, message, data)
{
}
public RemoteError(int? code, string message, object? data) : base(code, message, data)
{
}
}
public record Result<T>(bool Success, T? Data, Error? Error)
{
public Result(T data) : this(true, data, default)
{
}
public Result(Error error) : this(false, default, error)
{
}
public static Result<T> FromSuccess(T data)
{
return new Result<T>(data);
}
public static Result<T> FromError<TError>(TError error) where TError : Error
{
return new Result<T>(error);
}
}
public static class DisposableExtensions
{
public static T DisposeWith<T>(this T disposable, ICollection<IDisposable> collection)
where T : IDisposable
{
ArgumentNullException.ThrowIfNull(disposable);
ArgumentNullException.ThrowIfNull(collection);
collection.Add(disposable);
return disposable;
}
}
Upvotes: 0
Views: 140
Reputation: 117064
Here's a good start:
IObservable<TItemArgs> observable =
Observable
.Merge(
_itemObservable
.Where(item => MatchRequest(item, _itemRequestIdExtractor, requestId))
.Select(item => Notification.CreateOnNext(item))
.Take(1),
_itemEndObservable
.Any(item => MatchRequest(item, _itemListEndRequestIdExtractor, requestId))
.Select(item => Notification.CreateOnCompleted<TItemArgs>()))
.Dematerialize();
And here's a final observable without the ReplaySubject
.
private static async Task<Result<IEnumerable<Details>>> Test(IScheduler scheduler) =>
await
Observable
.Defer(() =>
{
var client = new IBClient();
var requestId = 1;
return Observable.Create<Result<IEnumerable<Details>>>(o =>
{
IDisposable subscription =
Observable
.Merge(
client.Details.Where(x => x.RequestId == requestId).Select(x => Notification.CreateOnNext(x)),
client.DetailsEnd.Any(x => x.RequestId == requestId).Select(x => Notification.CreateOnCompleted<Details>()),
client.Error.Where(x => x.RequestId == requestId).Select(x => Notification.CreateOnError<Details>(new Exception($"Code: {x.Code}, Message: {x.Message}"))))
.Dematerialize()
.Synchronize()
.ToArray()
.Select(items => Result<IEnumerable<Details>>.FromSuccess(items))
.Catch<Result<IEnumerable<Details>>, Exception>(ex => Observable.Return(Result<IEnumerable<Details>>.FromError(new Error(null, ex.Message, null))))
.Timeout(TimeSpan.FromSeconds(2))
.ObserveOn(scheduler)
.Subscribe(o);
client.Emit(2, 4);
client.Emit(requestId, 42);
client.EmitEnd(2);
client.Emit(requestId, 62);
client.Emit(requestId, 123);
//client.EmitError(requestId, 123, "Something bad happened");
client.EmitEnd(requestId);
client.Emit(requestId, 1);
return subscription;
});
});
Upvotes: 1