shda
shda

Reputation: 734

Applying conditional rate limiting decorator for async class method

I am using ccxt library for downloading OHLC data from various crypto exchanges. This is done internally using REST requests. These exchanges have different request rate limits, so I have to apply rate limiting to my methods. For rate limiting I use the library limiter. Here is what I have at the moment:

import ccxt.async_support as ccxt
from limiter import Limiter

limiter_binance = Limiter(rate=19, capacity=19, consume=1)
limiter_coinbase = Limiter(rate=3, capacity=3, consume=1)

class CCXTFeed:

    def __init__(self, exchange_name):
        self._exchange = ccxt.binance()
        self._exchange_name = exchange_name

    @limiter_binance 
    async def __FetchSymbolOHLCChunk(self, symbol, timeframe, startTime, limit=1000):
        data = await self._exchange.fetch_ohlcv(symbol, timeframe, since=startTime, limit=limit)
        return data

What I now want is to somehow apply limiter_coinbase if the class was instantiated with exchange_name="coinbase". I want to be able to choose applied rate limiter decorator based on the exchange I am currently working with. The exchange name is defined at runtime.

Upvotes: 0

Views: 208

Answers (1)

anom907 zat
anom907 zat

Reputation: 193

What you could do is create a custom decorator that then will redirect you to the correct decorator, for instance:


import ccxt.async_support as ccxt
from limiter import Limiter

limiter_binance = Limiter(rate=19, capacity=19, consume=1)
limiter_coinbase = Limiter(rate=3, capacity=3, consume=1)

def dispatch_limiter(func):
    async def mod_func(self, *args, **kwargs):
        if self._exchange_name == "coinbase":
            return await (limiter_coinbase(func)(self, *args, **kwargs)) # basically, because limiter_coinbase is intended to be a decorator, we first instantiate it with the function and then we call it as if it was the original function.
        elif self._exchange_name == "binance":
            return await (limiter_binance(func)(self, *args, **kwargs))
   return mod_func

class CCXTFeed:

    def __init__(self, exchange_name):
        self._exchange = ccxt.binance()
        self._exchange_name = exchange_name

    @dispatch_limiter
    async def __FetchSymbolOHLCChunk(self, symbol, timeframe, startTime, limit=1000):
        data = await self._exchange.fetch_ohlcv(symbol, timeframe, since=startTime, limit=limit)
        return data

Upvotes: 1

Related Questions