Kilian Hohm
Kilian Hohm

Reputation: 330

How can I receive data on client side before calling .end() on the server side for a gRPC stream

I am currently trying to setup a server stream with the gRPC Node.js API. For that I want to achieve that when I write on server side to the stream that the client immediately receives the data event.

At the moment I don't receive anything on client side if I only call write on server side. However as soon as I call the end function on the server the client receives all data events.

To test this I used an endless while loop for writing messages on server side. Then the client does not receive messages (data events). If instead I use a for loop and call end afterwards the client receives all the messages (data events) when end is called.

My .proto file:

syntax = "proto3";

message ControlMessage {
  enum Control {
    Undefined = 0;
    Start = 1;
    Stop = 2;
  }
  Control control = 1;
}

message ImageMessage {
  enum ImageType {
    Raw = 0;
    Mono8 = 1;
    RGB8 = 2;
  }
  ImageType type = 1;
  int32 width = 2;
  int32 height = 3;
  bytes image = 4;
}

service StartImageTransmission {
  rpc Start(ControlMessage) returns (stream ImageMessage);
}

On the server side I implement the start function and try to endlessly write messages to the call:

function doStart(call) {
  var imgMsg = {type: "Mono8", width: 600, height: 600, image: new ArrayBuffer(600*600)};
  //for(var i = 0; i < 10; i++) {
  while(true) {
    call.write(imgMsg);
    console.log("Message sent");
  }
  call.end();
}

I register the function as service in the server:

var server = new grpc.Server();
server.addService(protoDescriptor.StartImageTransmission.service, {Start: doStart});

On client side I generate an appropriate call and register the data and end event:

var call = client.Start({control: 0});
call.on('data', (imgMessage) => {
  console.log('received image message');
});
call.read();
call.on('end', () => {console.log('end');});

I also tried to write the server side in python. In this case the node client instantly receives messages and not only after stream was ended on server side. So I guess this should be also possible for the server written with the Node API.

Upvotes: 3

Views: 1940

Answers (2)

Draculater
Draculater

Reputation: 2278

I was able to use uncork which comes from Node.js's Writable.

Here is an example. Pseudocode, but pulled from across a working implementation:

import * as grpc from '@grpc/grpc-js';
import * as proto from './src/proto/generated/organizations'; // via protoc w/ ts-proto

const OrganizationsGrpcServer: proto.OrganizationsServer = {
  async getMany(call: ServerWritableStream<proto.Empty, proto.OrganizationCollection>) {
    call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
    call.uncork();
    // do some blocking stuff
    call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
    call.uncork();
    // call.end(), or client.close() below, at some point?
  },
  ping(call, callback) {
    callback(null);
  }
};

const client = new proto.OrganizationsClient('127.0.0.1:5000', grpc.credentials.createInsecure());
const stream = client.getMany(null);
stream.on('data', data => {
//  this cb should run twice
});
export default OrganizationsGrpcServer;
//.proto
service Organizations {
  rpc GetMany (google.protobuf.Empty) returns (stream OrganizationCollection) {}
}

message OrganizationCollection {
  repeated Organization value = 1;
}

Versions:

  • @grpc/grpc-js 1.4.4
  • @grpc/proto-loader 0.6.7
  • ts-proto 1.92.1
  • npm 8.1.4
  • node 17

Upvotes: 0

Kilian Hohm
Kilian Hohm

Reputation: 330

It seems that the problem was that the endless while loop is blocking all background tasks in node. A possible solution is to use setTimeout to create the loop. The following code worked for me:

First in the gRPC call store the call object in an array:

function doStart(call) {
  calls.push(call);
}

For sending to all clients I use a setTimeout:

function sendToAllClients() {
  calls.forEach((call) => {
    call.write(imgMsg);
  });
  setTimeout(sendToAllClients, 10);
}

setTimeout(sendToAllClients, 10);

Helpful stackoverflow atricle: Why does a while loop block the event loop?

Upvotes: 2

Related Questions