Reputation: 1
I'm using crochet to help me with the twisted architecture that the 3rd party package used. When I only have 1 instance of CTraderOpenAPI, I can get the result as expected. However when I tried to have 2 instances one for demo and another one for live, I start getting None response
Did I miss something on my code below?
from crochet import run_in_reactor, setup
from ctrader_open_api import Auth, Client, EndPoints, Protobuf, TcpProtocol
from ctrader_open_api.messages.OpenApiMessages_pb2 import * # noqa: F403
from dotenv import load_dotenv
from google.protobuf.json_format import MessageToDict
from config import config
from constant import API_TRANSACTION_SOURCE_CTRADER
from Dto.api_transaction_dto import ApiTransactionDto
from logger.log import APITransactionLogger
load_dotenv()
setup()
auth = Auth(
config.CTRADER_CLIENT_ID, config.CTRADER_CLIENT_SECRET, config.CTRADER_REDIRECT_URL
)
timeout = 120
class CTraderOpenAPI:
def __init__(self, host_type="demo"):
if host_type == "demo":
self.host_type = EndPoints.PROTOBUF_DEMO_HOST
else:
self.host_type = EndPoints.PROTOBUF_LIVE_HOST
self.ctrader_client_id = config.CTRADER_CLIENT_ID
self.ctrader_client_secret = config.CTRADER_CLIENT_SECRET
self.redirect_uri = config.CTRADER_REDIRECT_URL
self.init_client()
self.authorize_app()
def on_error(self, failure):
print("Error: ", failure)
print(failure.getErrorMessage())
def disconnected(self, client, reason):
print("\nDisconnected: ", reason)
def encodeResult(self, result):
if result is None:
return
return MessageToDict(Protobuf.extract(result))
@run_in_reactor
def sendProtoOAApplicationAuthReq(self):
request = ProtoOAApplicationAuthReq() # type: ignore # noqa: F405
request.clientId = self.ctrader_client_id
request.clientSecret = self.ctrader_client_secret
deferred = self.client.send(request)
deferred.addErrback(self.on_error)
return deferred
def authorize_app(self):
d = self.sendProtoOAApplicationAuthReq()
result = d.wait(timeout=timeout)
encoded_result = self.encodeResult(result)
return encoded_result
# ------------------------------------------------------------------------
def init_client(self):
self.client = Client(
self.host_type,
EndPoints.PROTOBUF_PORT,
TcpProtocol,
)
self.client.setDisconnectedCallback(self.disconnected)
self.client.startService()
def stop_service(self):
self.client.stopService()
def get_auth_uri(self):
return auth.getAuthUri()
def refresh_token(self, refresh_token):
return auth.refreshToken(refresh_token)
def get_access_token(self, auth_code):
token = auth.getToken(auth_code)
if "accessToken" not in token:
raise KeyError(token)
return token
ctrader_client = CTraderOpenAPI()
ctrader_client_live = CTraderOpenAPI("live")
I tried to use threading, however I'm not sure I did correctly since I'm new on python
Upvotes: 0
Views: 17