Yunus Einsteinium
Yunus Einsteinium

Reputation: 1180

Netty TCP Socket InputStream

Netty TCP Server is running at port 8000 receiving NMEA format data. It uses Marine API library to convert the gibberish to a meaningful information which needs input stream from the socket.

SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();

How can i get inputstream for netty server port being used?

Upvotes: 1

Views: 1399

Answers (2)

Ferrybig
Ferrybig

Reputation: 18834

SentenceReader does not have any method to accept "streamed in" data, however with subclassing, it can be made to accept the data.

The core of SentenceReader uses a DataReader for its data, normally this datareader is polled from a seperate thread SentenceReader itself, and we can modify this structure to get what we need.

First, we subclass SentenceReader with our own class, give it the proper constructor and methods we want, and remove the effect of the start and stop methods. We provide null as the file for now (and hope future versions provide a method to pass a datareader in directly)

public class NettySentenceReader extends SentenceReader {
    public NettySentenceReader () {
        super((InputStream)null);
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }
}

We now need to implement all functionality of the internal class DataReader inside our own Netty handler, to replicate the same behaviour

public class SentenceReaderHandler extends
         SimpleChannelInboundHandler<String> {
    private SentenceFactory factory;
    private SentenceReader parent;

    public SentenceReaderHandler (SentenceReader parent) {
        this.parent = parent;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        if(!ctx.channel().isActive())
            return;
        //ActivityMonitor monitor = new ActivityMonitor(parent);
        this.factory = SentenceFactory.getInstance();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //ActivityMonitor monitor = new ActivityMonitor(parent);
        this.factory = SentenceFactory.getInstance();
    }

    @Override
    // This method will be renamed to `messageReceived` in Netty 5.0.0
    protected void channelRead0(ChannelHandlerContext ctx, String data)
             throws Exception {
        if (SentenceValidator.isValid(data)) {
            monitor.refresh();
            Sentence s = factory.createParser(data);
            parent.fireSentenceEvent(s);
        } else if (!SentenceValidator.isSentence(data)) {
            parent.fireDataEvent(data);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        //monitor.reset();
        parent.fireReadingStopped();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        if(!ctx.channel().isActive())
            return;
        //monitor.reset();
        parent.fireReadingStopped();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        parent.handleException("Data read failed", e);
    }
}

Finally, we need to integrate this into a Netty pipeline:

SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    private static final StringDecoder DECODER = new StringDecoder();
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(DECODER);
        pipeline.addLast(new SentenceReaderHandler(reader)); 
    }
});

Upvotes: 1

Norman Maurer
Norman Maurer

Reputation: 23557

You can't easily as InputStream is blocking and netty is an async - non blocking API.

Upvotes: 1

Related Questions