Danny Kruitbosch
Danny Kruitbosch

Reputation: 140

How to convert received data to inputstream and outputstream

I have to create TCP/IP server for RPC communications. I'm bound to using a provided java lib that handles all the "rpc" stuff. This lib received HDLC messages that contain protobuf data. The lib itself uses request and response handlers to handle the HDLC and protobuf part. This lib can be used for serial connections as well as networked connections.

We'd like to use netty for the TCP server for this. When calling this lib it expects an java.io.inputstream and java.io.outputstream in the "RPC" method.

I have a simple blocking setup, in which I create a server socket, and just pass a socket.getInputStream() and socket.getOutputStream() to the RPC method. Next I need to register a (set) of rpc handlers to this rpc object, and than clients can connect and data can be send. Seems pretty straightforward to me.

I've also setup a netty "echo" server and now I want to use this RPC library with netty. What I'm struggling with is how to convert my received data to the required InputStream and how to convert the OutputStream of the RPC lib so that it can be send back to the client. Do I need a decoder/encoder, or is there a simpler way to do this? And if so, how do I transform the ByteBuf to an InputStream and the OutputStream back the format that can be send over the network?

Upvotes: 2

Views: 1890

Answers (1)

Ferrybig
Ferrybig

Reputation: 18834

Inputstream

The other api has a readpacket() method

If you library has a readPacket method, you can use the ByteBufInputStream in combination with a ReplayingDecoder, this is relatively easy to implement:

public class RPCInputHandler extends ReplayingDecoder<Object> {
   RPC upstream = ....;
   protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
      upstream.readPacket(new ByteBufInputStream(buf));
      state(null);
   }
}

The remote api uses threads to read the stream

If your upstream library uses a separate thread to process the incoming messages, you are going to lose 1 of the main advantages of Netty: a low thread count for a high number of connections.

public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> {
   RPC upstream = ....;
   PipedInputStream in;
   PipedOutputStream out;

   @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
       in = new PipedInputStream();
       out = new PipedOutputStream(in);
       upstream.startInput(in);
   }

   @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       out.close(); // This sends EOF to the other pipe
   }

   // This method is called messageReceived(ChannelHandlerContext, I) in 5.0.
   public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      byte[] data = new byte[msg.readableBytes()];
      msg.readBytes(data);
      out.write(data);
   }
}

Outputstream

Making a custom OutputStream that writes bytes to our connection is simple, most methods map directly over

public class OutputStreamNetty extends OutputStream {
    final Channel channel = ...;
    ChannelFuture lastFuture;

    private void checkFuture() throws IOException {
        if(lastFuture.isDone()) {
            if(lastFuture.cause() != null) {
                throw new IOException("Downstream write problem", lastFuture.cause() );
            }
            lastFuture = null;
        }
    }

    private void addFuture(ChannelFuture f) {
        if(lastFuture == null) {
            lastFuture = f;
        }
    }

    public void close() throws IOException {
        checkFuture()
        addFuture(channel.close());
    }
    public void flush() throws IOException {
        checkFuture()
        addFuture(channel.flush());
    }
    public void write(byte[] b, int off, int len) throws IOException {
        checkFuture()
        Bytebuf f = channel.alloc().buffer(len);
        f.writeBytes(b, off, len);
        addFuture(channel.write(f));
    }
    public abstract void write(int b) throws IOException {
        checkFuture()
        Bytebuf f = channel.alloc().buffer(1);
        f.writeByte(b);
        addFuture(channel.write(f));
    }

}

Upvotes: 2

Related Questions