joshp
joshp

Reputation: 836

gRPC slow serialization on large dataset

I know that google states that protobufs don't support large messages (i.e. greater than 1 MB), but I'm trying to stream a dataset using gRPC that's tens of megabytes, and it seems like some people say it's ok, or at least with some splitting...

However, when I try to send an array this way (repeated uint32), it takes like 20 seconds on the same local machine.

#proto
service PAS {
  // analyze single file
  rpc getPhotonRecords (PhotonRecordsRequest) returns (PhotonRecordsReply) {}
}

message PhotonRecordsRequest {
  string fileName = 1;
}

message PhotonRecordsReply {
  repeated uint32 PhotonRecords = 1;
}

where PhotonRecordsReply needs to be ~10 million uint32 in length...

Does anyone have an idea on how to speed this up? Or what technology would be more appropriate?

So I think I've implemented streaming based on comments and answers given, but it still takes the same amount of time:

#proto
service PAS {
  // analyze single file
  rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
class PAS_GRPC(pas_pb2_grpc.PASServicer):

    def getPhotonRecords(self, request: pas_pb2.PhotonRecordsRequest, _context):
        raw_data_bytes = flb_tools.read_data_bytes(request.fileName)
        data = flb_tools.reshape_flb_data(raw_data_bytes)
        index = 0
        chunk_size = 1024
        len_data = len(data)
        while index < len_data:
            # last chunk
            if index + chunk_size > len_data:
                yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:])
            # all other chunks
            else:
                yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:index + chunk_size])
            index += chunk_size

Min repro Github example

Upvotes: 3

Views: 1773

Answers (1)

aerobiotic
aerobiotic

Reputation: 381

If you changed it over to use streams that should help. It took less than 2 seconds to transfer for me. Note this was without ssl and on localhost. This code I threw together. I did run it and it worked. Not sure what might happen if the file is not a multiple of 4 bytes for example. Also the endian order of bytes read is the default for Java.

I made my 10 meg file like this.

dd if=/dev/random  of=my_10mb_file bs=1024 count=10240

Here's the service definition. Only thing I added here was the stream to the response.

service PAS {
  // analyze single file
  rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}

Here's the server implementation.

public class PhotonsServerImpl extends PASImplBase {

  @Override
  public void getPhotonRecords(PhotonRecordsRequest request, StreamObserver<PhotonRecordsReply> responseObserver) {
    log.info("inside getPhotonRecords");
    
    // open the file, I suggest using java.nio API for the fastest read times.
    Path file = Paths.get(request.getFileName());
    try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ)) {

      int blockSize = 1024 * 4;
      ByteBuffer byteBuffer = ByteBuffer.allocate(blockSize);
      boolean done = false;
      while (!done) {
        PhotonRecordsReply.Builder response = PhotonRecordsReply.newBuilder();
        // read 1000 ints from the file.
        byteBuffer.clear();
        int read = fileChannel.read(byteBuffer);
        if (read < blockSize) {
          done = true;
        }
        // write to the response.
        byteBuffer.flip();
        for (int index = 0; index < read / 4; index++) {
          response.addPhotonRecords(byteBuffer.getInt());
        }
        // send the response
        responseObserver.onNext(response.build());
      }
    } catch (Exception e) {
      log.error("", e);
      responseObserver.onError(
          Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException());
    }
    responseObserver.onCompleted();
    log.info("exit getPhotonRecords");

  }
}

The client just logs the size of the array received.

public long getPhotonRecords(ManagedChannel channel) {
  if (log.isInfoEnabled())
    log.info("Enter - getPhotonRecords ");

  PASGrpc.PASBlockingStub photonClient = PASGrpc.newBlockingStub(channel);

  PhotonRecordsRequest request = PhotonRecordsRequest.newBuilder().setFileName("/udata/jdrummond/logs/my_10mb_file").build();

  photonClient.getPhotonRecords(request).forEachRemaining(photonRecordsReply -> {
    log.info("got this many photons: {}", photonRecordsReply.getPhotonRecordsCount());
  });

  return 0;
}

Upvotes: 2

Related Questions