Greg Brown
Greg Brown

Reputation: 1299

Checking to see when a gRPC stream is empty or isn't streaming data

I am creating a stub that connects to a server that streams off data at a specific interval, then uploading it to a TSDB. I have implemented batching to optimize the upload, but if the amount of data streamed off in one interval doesn't aline to the batch size some data will not be uploaded until the next interval, which I don't want. Is there a way on a gRPC stub to check if the stream is empty?

class DialInClient(object):
    def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
        self._host = host
        self._port = port
        self._timeout = float(timeout)
        self._channel = None
        self._cisco_ems_stub = None
        self._connected = False
        self._metadata = [('username', user), ('password', password)]

    def subscribe(self, sub_id):
        sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
        stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
        for segment in stream:
            yield segment 

    def connect(self):
        self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
        try:
            grpc.channel_ready_future(self._channel).result(timeout=10)
            self._connected = True
        except grpc.FutureTimeoutError as e:
            raise DeviceFailedToConnect from e
        else:
            self._cisco_ems_stub = gRPCConfigOperStub(self._channel)

If I set a low timeout, the whole channel disconnects, I want to add some sort of timeout in the for loop for streaming to see if I don't get another segment in 1 second yield None to tell my other part that that is the end and to upload without a full batch size.

Upvotes: 2

Views: 1956

Answers (1)

Richard Belleville
Richard Belleville

Reputation: 1620

No such mechanism exists natively within GRPC, but the threading library should allow you to send off batches before they're full. I've included a modified version of the python GRPC hello world example to give you an idea of how that might be done.

from __future__ import print_function                                                                                                        

import grpc                                                                                                                                  

import helloworld_pb2
import helloworld_pb2_grpc                                                                                                                   

import threading
from six.moves import queue
import time 

# 10 second batches    
BATCH_PERIOD = 10.0

def collect_responses(resp_queue, finished):                                                                                                 
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)                                                                                      
        for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings="100"))):                           
            resp_queue.put(response)                                                                                                         
    finished.set()                                                                                                                           

def is_batch_end(batch_start):                                                                                                               
    return time.time() - batch_start < BATCH_PERIOD                                                                                          

def get_remaining_time(time_start):                                                                                                          
    return (time_start + BATCH_PERIOD) - time.time()

def batch_responses(resp_queue, finished):
    batch_num = 0
    while True:        
        batch_resps = []
        batch_start = time.time()
        remaining_time = get_remaining_time(batch_start)                                                                                     
        while remaining_time > 0.0 and not finished.is_set():
            try:       
                batch_resps.append(resp_queue.get())                                                                                         
            except queue.Empty:                                                                                                              
                pass                                                                                                                         
            finally:
                remaining_time = get_remaining_time(batch_start)
        print("Batch {} ({}):".format(batch_num + 1, len(batch_resps)))                                                                      
        for resp in batch_resps:                                                                                                             
            print("  '{}'".format(resp.message))
        batch_num += 1

def run():                                                                                                                                   
    resp_queue = queue.Queue()
    finished = threading.Event()                                                                                                             
    client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))                                                  
    client_thread.start()
    batch_responses(resp_queue, finished)                                                                                                    
    client_thread.join()

if __name__ == '__main__':                                                                                                                   
    run() 

Upvotes: 1

Related Questions