Reputation: 1299
I am creating a stub that connects to a server that streams off data at a specific interval, then uploading it to a TSDB. I have implemented batching to optimize the upload, but if the amount of data streamed off in one interval doesn't aline to the batch size some data will not be uploaded until the next interval, which I don't want. Is there a way on a gRPC stub to check if the stream is empty?
class DialInClient(object):
def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
self._host = host
self._port = port
self._timeout = float(timeout)
self._channel = None
self._cisco_ems_stub = None
self._connected = False
self._metadata = [('username', user), ('password', password)]
def subscribe(self, sub_id):
sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
for segment in stream:
yield segment
def connect(self):
self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
try:
grpc.channel_ready_future(self._channel).result(timeout=10)
self._connected = True
except grpc.FutureTimeoutError as e:
raise DeviceFailedToConnect from e
else:
self._cisco_ems_stub = gRPCConfigOperStub(self._channel)
If I set a low timeout, the whole channel disconnects, I want to add some sort of timeout in the for loop for streaming to see if I don't get another segment in 1 second yield None
to tell my other part that that is the end and to upload without a full batch size.
Upvotes: 2
Views: 1956
Reputation: 1620
No such mechanism exists natively within GRPC, but the threading
library should allow you to send off batches before they're full. I've included a modified version of the python GRPC hello world example to give you an idea of how that might be done.
from __future__ import print_function
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
import threading
from six.moves import queue
import time
# 10 second batches
BATCH_PERIOD = 10.0
def collect_responses(resp_queue, finished):
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings="100"))):
resp_queue.put(response)
finished.set()
def is_batch_end(batch_start):
return time.time() - batch_start < BATCH_PERIOD
def get_remaining_time(time_start):
return (time_start + BATCH_PERIOD) - time.time()
def batch_responses(resp_queue, finished):
batch_num = 0
while True:
batch_resps = []
batch_start = time.time()
remaining_time = get_remaining_time(batch_start)
while remaining_time > 0.0 and not finished.is_set():
try:
batch_resps.append(resp_queue.get())
except queue.Empty:
pass
finally:
remaining_time = get_remaining_time(batch_start)
print("Batch {} ({}):".format(batch_num + 1, len(batch_resps)))
for resp in batch_resps:
print(" '{}'".format(resp.message))
batch_num += 1
def run():
resp_queue = queue.Queue()
finished = threading.Event()
client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))
client_thread.start()
batch_responses(resp_queue, finished)
client_thread.join()
if __name__ == '__main__':
run()
Upvotes: 1