Reputation: 6311
I have unknown amount of subscriptions that I would like to dispose at once, because they might become a lot. Is there a mechanism to dispose them all at once using System.Reactive
? Perhaps, wrapping them into Observable.Using(() => Disposable.Create...
would work?
client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})"));
client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
$"index price {funding.IndexPrice}");
});
client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});
client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});
client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] " +
$"Best ask price: {ob.BestAskPrice} " +
$"Best ask qty: {ob.BestAskQty} " +
$"Best bid price: {ob.BestBidPrice} " +
$"Best bid qty: {ob.BestBidQty}");
});
client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] " +
$"Kline start time: {ob.Data.StartTime} " +
$"Kline close time: {ob.Data.CloseTime} " +
$"Interval: {ob.Data.Interval} " +
$"First trade ID: {ob.Data.FirstTradeId} " +
$"Last trade ID: {ob.Data.LastTradeId} " +
$"Open price: {ob.Data.OpenPrice} " +
$"Close price: {ob.Data.ClosePrice} " +
$"High price: {ob.Data.HighPrice} " +
$"Low price: {ob.Data.LowPrice} " +
$"Base asset volume: {ob.Data.BaseAssetVolume} " +
$"Number of trades: {ob.Data.NumberTrades} " +
$"Is this kline closed?: {ob.Data.IsClose} " +
$"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
$"Ignore: {ob.Data.Ignore} ");
});
client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] " +
$"Open price: {ob.OpenPrice} " +
$"Close price: {ob.ClosePrice} " +
$"High price: {ob.HighPrice} " +
$"Low price: {ob.LowPrice} " +
$"Base asset volume: {ob.BaseAssetVolume} " +
$"Quote asset volume: {ob.QuoteAssetVolume}");
});
Here is what these subscriptions actually are.
public class BinanceClientStreams
{
internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();
internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();
internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
new Subject<OrderBookPartialResponse>();
internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();
internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
// PUBLIC
/// <summary>
/// Response stream to every ping request
/// </summary>
public IObservable<PongResponse> PongStream => PongSubject.AsObservable();
/// <summary>
/// Trades stream - emits every executed trade on Binance
/// </summary>
public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();
/// <summary>
/// Chunk of trades - emits grouped trades together
/// </summary>
public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();
/// <summary>
/// Partial order book stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();
/// <summary>
/// Order book difference stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();
/// <summary>
/// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
/// </summary>
public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();
/// <summary>
/// The best bid or ask's price or quantity in real-time for a specified symbol
/// </summary>
public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();
/// <summary>
/// The Kline/Candlestick subscription, provide symbol and chart intervals
/// </summary>
public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();
/// <summary>
/// Mini-ticker specified symbol statistics for the previous 24hrs
/// </summary>
public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}
Upvotes: 0
Views: 116
Reputation: 687
I think what you are looking for is a CompositeDisposable
. You need to create an instance of that class and add all of your subscriptions to it.
var compDisp = new CompositeDisposable();
compDisp.Add(client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})")));
compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
$"index price {funding.IndexPrice}");
}));
compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
}));
compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
}));
compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] " +
$"Best ask price: {ob.BestAskPrice} " +
$"Best ask qty: {ob.BestAskQty} " +
$"Best bid price: {ob.BestBidPrice} " +
$"Best bid qty: {ob.BestBidQty}");
}));
compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] " +
$"Kline start time: {ob.Data.StartTime} " +
$"Kline close time: {ob.Data.CloseTime} " +
$"Interval: {ob.Data.Interval} " +
$"First trade ID: {ob.Data.FirstTradeId} " +
$"Last trade ID: {ob.Data.LastTradeId} " +
$"Open price: {ob.Data.OpenPrice} " +
$"Close price: {ob.Data.ClosePrice} " +
$"High price: {ob.Data.HighPrice} " +
$"Low price: {ob.Data.LowPrice} " +
$"Base asset volume: {ob.Data.BaseAssetVolume} " +
$"Number of trades: {ob.Data.NumberTrades} " +
$"Is this kline closed?: {ob.Data.IsClose} " +
$"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
$"Ignore: {ob.Data.Ignore} ");
}));
compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] " +
$"Open price: {ob.OpenPrice} " +
$"Close price: {ob.ClosePrice} " +
$"High price: {ob.HighPrice} " +
$"Low price: {ob.LowPrice} " +
$"Base asset volume: {ob.BaseAssetVolume} " +
$"Quote asset volume: {ob.QuoteAssetVolume}");
}));
All subscriptions will be disposed once the compDisp
instance gets disposed. When this is done depends on the context of you application of course.
Edit:
Depending on your application architecture the WhenActivated
extension method might also be interesting for you. It is defined on the ActivatableView
and ActivatableViewModel
interface and accepts a function that is called every time a view (model) gets activated (i.e. basically when it is displayed on screen). This function also has a CompositeDisposable
as parameter which is diposed every time that the view (model) get deactivated.
Edit 2
Just realized that the DiposeWith
method is in fact part of the ReactiveUI
framework as well as the WhenAcitvated
extension method and not part of the reactive extension on which this framework is based. So you cannot write things like myObservable.Subscribe(x => ...).DisposeWith(compDisp)
without using that framework but compDisp.Add(myObservable.Subscribe(x => ...))
should work. I have adjusted the code above correspondingly.
Upvotes: 3