Reputation: 6331
The code below downloads historical OHLCV data from Binance from a start date to an end date. Since Binance allows us to download only 1000 candles at a time, I did DownloadAsync
the way it is. Any recommendations onto the code, are appreciated as well.
The actual question is about making DownloadAsync
multithreaded, in order to speed up the process, because imagine downloading candles from 2018 to 2021 on 5m interval. I would prefer using System.Reactive
, but I guess other solutions are welcome too, since it's hard to represent the code to a multithreading version.
The code below can be tested.
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;
namespace DownloadCandleDataTest
{
public class DataProvider
{
private Exchange _exchange;
public DataProvider(Exchange exchange)
{
_exchange = exchange;
}
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
{
DateTime start = startDate;
DateTime end = endDate;
var tempStartDate = start;
var tempEndDate = end;
var tempList = new List<OHLCV>();
for (int i = 0; tempStartDate < tempEndDate; i++)
{
var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);
if (candles.Count > 0)
{
// Remove the first candle when i > 0, to prevent duplicates
if (i > 0)
{
candles.RemoveAt(0);
}
var first = candles.First();
var last = candles.Last();
Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");
tempList.AddRange(candles);
tempStartDate = last.Timestamp;
}
}
// Find duplicates
var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());
foreach (var group in groups)
{
Console.WriteLine(group.Key);
foreach (var ohclv in group)
{
Console.WriteLine("\t" + ohclv.Timestamp);
}
}
return null;
}
}
class Program
{
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("\r\n\t In Main Thread...\r\n");
o.Wait(); // Wait for completion of background operation.
}
static async Task Main(string[] args)
{
using var exchange = new Exchange();
var dataProvider = new DataProvider(exchange);
await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);
Console.ReadLine();
}
}
public class OHLCV
{
public DateTime Timestamp { get; set; }
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public decimal Volume { get; set; }
}
public static class Extensions
{
public static OHLCV ToCandle(this IBinanceKline candle)
{
return new OHLCV
{
Timestamp = candle.OpenTime,
Open = candle.Open,
High = candle.High,
Low = candle.Low,
Close = candle.Close,
Volume = candle.BaseVolume,
};
}
}
public class Exchange : IDisposable
{
private readonly IBinanceClient _client;
public Exchange()
{
_client = new BinanceClient();
}
public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
{
var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);
if (result.Success)
{
return result.Data?.Select(e => e.ToCandle()).ToList();
}
return null;
}
public void Dispose()
{
if (_client != null)
{
_client.Dispose();
}
}
}
}
Upvotes: 0
Views: 436
Reputation: 117084
You're seriously overthinking this.
Since you're getting evenly spaced candles and you know how many you get per call to GetKlinesAsync
you can compute all of the start dates required.
var pair = "TRXUSDT";
var interval = KlineInterval.FiveMinutes;
var startDate = new DateTime(2019, 1, 1);
var endDate = new DateTime(2019, 1, 2);
var gap = 5.0; // same as `interval` for purpose of computing start dates.
var limit = 100;
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
Now it's quite easy to generate your query:
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
Since this is querying in parallel it is possible, even likely, that you'll get your results out of order, so you need to do a .ToArray()
on the query to be able to deal with all of the data produced at the end, rather than as each candle comes in.
IDisposable subscription =
query
.ToArray()
.Select(xs => xs.OrderBy(x => x.Timestamp).ToArray())
.Subscribe(cs =>
{
/* candles downloaded using multiple threads */
/* and sorted in `Timestamp` order */
});
This produces all of the candles, in order, using multiple threads, without any duplicates.
If you want it as the DownLoadAsync
method then you do this:
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, double gap, DateTime startDate, DateTime endDate, int limit)
{
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
return await query.ToArray().Select(xs => xs.OrderBy(x => x.Timestamp).ToList());
}
Note that the signature has changed slightly.
Upvotes: 1
Reputation: 4680
The key to make more web requests in parallel is to create many tasks and to await them all with Task.WhenAll()
instead of awaiting each one inside the loop(s).
If you await each one in a loop they will be processed sequentially (though the UI thread will not be blocked whilst the web request is made).
Upvotes: 0