Reputation: 1
I am using the official binance API connector. It consisted mainly of two parts:
I first send two orders using my modify async function to send two orders at the same time. Then I will receive two order_updates. Inside each one of them I want to trigger a new order( new order has a new update that trigger another order, on and on...).
But since my first two orders was sent at the same time using async, then I will receive the two updates at about the same time. But sending a order usually takes 10ms. I want send order instantly without blocking. For example, first update trigger an order, after 1ms, another update was received (but first order was not finished). I want send another order instantly without waiting for the first one to be finished.
The following is my code, but it did not work. First two orders was sent successfully, but when update trigger another order, the new one seems never send (stuck at the mark endpoint).
class Strategy(Utils):
def __init__(self, async_client):
super().__init__()
self.async_client = async_client
self.order_queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
def order_trigger_sync(self, msg):
"""Callback function for WebSocket to enqueue updates."""
try:
if self.loop.is_closed():
print("Event loop is closed. Cannot enqueue message.")
return
self.order_queue.put_nowait(msg)
except Exception as e:
print(f"Error enqueuing message: {e}")
async def order_worker(self, worker_id):
"""Asynchronous worker to process orders from the queue."""
while True:
msg = await self.order_queue.get()
print(f'dd{msg}')
try:
if "e" in msg and msg['e'] == 'ORDER_TRADE_UPDATE':
to_send = {}
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
print(f"Worker {worker_id}: Sending orders: {orders}")
await self.async_client.process_requests(orders)
except Exception as e:
logging.error(f"Worker {worker_id}: Error processing order: {e}")
finally:
self.order_queue.task_done()
async def start_workers(self, num_workers=3):
"""Start multiple workers to process the order queue."""
for i in range(num_workers):
asyncio.create_task(self.order_worker(worker_id=i))
async def start(self):
await self.start_workers()
await asyncio.sleep(10)
to_send = dict()
for token in self.tokens:
mid = self.book_series[token].get_current_mid()
to_send[token] = self.gen_signal(token, mid)
orders = self.to_orders(to_send)
await self.async_client.process_requests(orders)
async def main():
async with AsyncClient(show_header=True) as client:
strategy = Strategy(async_client=client)
# Start WebSocket connection and register callbacks
connect = Connection(
book_callback=strategy.book_callback, # Depth book updates
trade_callback=strategy.order_trigger_sync, # Triggered on order updates
)
await strategy.start()
if __name__ == "__main__":
asyncio.run(main())
My process_requests function is like this:
async def process_requests(self, reqs):
tasks = []
for action, content in reqs.items():
if len(content) != 0:
for _ in content:
tasks.append(self.action_mapper[action](**_))
if len(tasks) != 0:
print('mark')
done, pending = await asyncio.wait(tasks)
Upvotes: 0
Views: 25