Reputation: 1180
Netty TCP Server is running at port 8000
receiving NMEA
format data. It uses Marine API library to convert the gibberish to a meaningful information which needs input stream from the socket.
SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();
How can i get inputstream for netty server port being used?
Upvotes: 1
Views: 1399
Reputation: 18834
SentenceReader
does not have any method to accept "streamed in" data, however with subclassing, it can be made to accept the data.
The core of SentenceReader
uses a DataReader
for its data, normally this datareader is polled from a seperate thread SentenceReader
itself, and we can modify this structure to get what we need.
First, we subclass SentenceReader
with our own class, give it the proper constructor and methods we want, and remove the effect of the start and stop methods. We provide null
as the file for now (and hope future versions provide a method to pass a datareader in directly)
public class NettySentenceReader extends SentenceReader {
public NettySentenceReader () {
super((InputStream)null);
}
@Override
public void start() {
}
@Override
public void stop() {
}
}
We now need to implement all functionality of the internal class DataReader
inside our own Netty handler, to replicate the same behaviour
public class SentenceReaderHandler extends
SimpleChannelInboundHandler<String> {
private SentenceFactory factory;
private SentenceReader parent;
public SentenceReaderHandler (SentenceReader parent) {
this.parent = parent;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
// This method will be renamed to `messageReceived` in Netty 5.0.0
protected void channelRead0(ChannelHandlerContext ctx, String data)
throws Exception {
if (SentenceValidator.isValid(data)) {
monitor.refresh();
Sentence s = factory.createParser(data);
parent.fireSentenceEvent(s);
} else if (!SentenceValidator.isSentence(data)) {
parent.fireDataEvent(data);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
parent.handleException("Data read failed", e);
}
}
Finally, we need to integrate this into a Netty pipeline:
SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
private static final StringDecoder DECODER = new StringDecoder();
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(new SentenceReaderHandler(reader));
}
});
Upvotes: 1
Reputation: 23557
You can't easily as InputStream is blocking and netty is an async - non blocking API.
Upvotes: 1