Reputation: 140
I have to create TCP/IP server for RPC communications. I'm bound to using a provided java lib that handles all the "rpc" stuff. This lib received HDLC messages that contain protobuf data. The lib itself uses request and response handlers to handle the HDLC and protobuf part. This lib can be used for serial connections as well as networked connections.
We'd like to use netty for the TCP server for this. When calling this lib it expects an java.io.inputstream
and java.io.outputstream
in the "RPC" method.
I have a simple blocking setup, in which I create a server socket, and just pass a socket.getInputStream()
and socket.getOutputStream()
to the RPC method. Next I need to register a (set) of rpc handlers to this rpc object, and than clients can connect and data can be send. Seems pretty straightforward to me.
I've also setup a netty "echo" server and now I want to use this RPC library with netty. What I'm struggling with is how to convert my received data to the required InputStream
and how to convert the OutputStream
of the RPC lib so that it can be send back to the client. Do I need a decoder/encoder, or is there a simpler way to do this? And if so, how do I transform the ByteBuf
to an InputStream
and the OutputStream
back the format that can be send over the network?
Upvotes: 2
Views: 1890
Reputation: 18834
If you library has a readPacket method, you can use the ByteBufInputStream
in combination with a ReplayingDecoder
, this is relatively easy to implement:
public class RPCInputHandler extends ReplayingDecoder<Object> {
RPC upstream = ....;
protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
upstream.readPacket(new ByteBufInputStream(buf));
state(null);
}
}
If your upstream library uses a separate thread to process the incoming messages, you are going to lose 1 of the main advantages of Netty: a low thread count for a high number of connections.
public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> {
RPC upstream = ....;
PipedInputStream in;
PipedOutputStream out;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
in = new PipedInputStream();
out = new PipedOutputStream(in);
upstream.startInput(in);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.close(); // This sends EOF to the other pipe
}
// This method is called messageReceived(ChannelHandlerContext, I) in 5.0.
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] data = new byte[msg.readableBytes()];
msg.readBytes(data);
out.write(data);
}
}
Making a custom OutputStream that writes bytes to our connection is simple, most methods map directly over
public class OutputStreamNetty extends OutputStream {
final Channel channel = ...;
ChannelFuture lastFuture;
private void checkFuture() throws IOException {
if(lastFuture.isDone()) {
if(lastFuture.cause() != null) {
throw new IOException("Downstream write problem", lastFuture.cause() );
}
lastFuture = null;
}
}
private void addFuture(ChannelFuture f) {
if(lastFuture == null) {
lastFuture = f;
}
}
public void close() throws IOException {
checkFuture()
addFuture(channel.close());
}
public void flush() throws IOException {
checkFuture()
addFuture(channel.flush());
}
public void write(byte[] b, int off, int len) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(len);
f.writeBytes(b, off, len);
addFuture(channel.write(f));
}
public abstract void write(int b) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(1);
f.writeByte(b);
addFuture(channel.write(f));
}
}
Upvotes: 2