Muhammad Bekette
Muhammad Bekette

Reputation: 1434

How to manipulate Message coming from Netty server/client

I am prototyping a Netty client/server transfer for strings, now I want to pass these strings to file when it arrives to server side.

Client:

    private ClientBootstrap bootstrap;
    private Channel connector;
    private MyHandler handler=new MyHandler();

    public boolean start() {
        // Standard netty bootstrapping stuff.
        Executor bossPool = Executors.newCachedThreadPool();
        Executor workerPool = Executors.newCachedThreadPool();
        ChannelFactory factory =
                new NioClientSocketChannelFactory(bossPool, workerPool);
        this.bootstrap = new ClientBootstrap(factory);

        // Declared outside to fit under 80 char limit
        final DelimiterBasedFrameDecoder frameDecoder =
                new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,
                        Delimiters.lineDelimiter());
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(
                        handler,
                        frameDecoder,
                        new StringDecoder(),
                        new StringEncoder());
            }
        });

        ChannelFuture future = this.bootstrap
                .connect(new InetSocketAddress("localhost", 12345));
        if (!future.awaitUninterruptibly().isSuccess()) {
            System.out.println("--- CLIENT - Failed to connect to server at " +
                    "localhost:12345.");
            this.bootstrap.releaseExternalResources();
            return false;
        }

        this.connector = future.getChannel();
        return this.connector.isConnected();
    }

    public void stop() {
        if (this.connector != null) {
            this.connector.close().awaitUninterruptibly();
        }
        this.bootstrap.releaseExternalResources();
        System.out.println("--- CLIENT - Stopped.");
    }

    public boolean sendMessage(String message) {
        if (this.connector.isConnected()) {
            // Append \n if it's not present, because of the frame delimiter
            if (!message.endsWith("\n")) {
                this.connector.write(message + '\n');
            } else {
                this.connector.write(message);
            }
            System.out.print(message);
            return true;
        }

        return false;
    }

Server:

    private final String id;
    private ServerBootstrap bootstrap;
    private ChannelGroup channelGroup;
    private MyHandler handler= new MyHandler();


    public Server(String id) {
        this.id = id;
    }

    // public methods ---------------------------------------------------------

    public boolean start() {
        // Pretty standard Netty startup stuff...
        // boss/worker executors, channel factory, channel group, pipeline, ...
        Executor bossPool = Executors.newCachedThreadPool();
        Executor workerPool = Executors.newCachedThreadPool();
        ChannelFactory factory =
                new NioServerSocketChannelFactory(bossPool, workerPool);
        this.bootstrap = new ServerBootstrap(factory);

        this.channelGroup = new DefaultChannelGroup(this.id + "-all-channels");


        // declared here to fit under the 80 char limit
        final ChannelHandler delimiter =
                new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,
                        Delimiters.lineDelimiter());
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                SimpleChannelHandler handshakeHandler =
                        new SimpleChannelHandler();
                return Channels.pipeline(
                        handler,
                        delimiter,
                        new StringDecoder(),
                        new StringEncoder(),
                        handshakeHandler);
            }
        });

        Channel acceptor = this.bootstrap.bind(new InetSocketAddress(12345));
        if (acceptor.isBound()) {
            System.out.println("+++ SERVER - bound to *:12345");
            this.channelGroup.add(acceptor);
            return true;
        } else {
            System.err.println("+++ SERVER - Failed to bind to *:12345");
            this.bootstrap.releaseExternalResources();
            return false;
        }
    }

    public void stop() {
        this.channelGroup.close().awaitUninterruptibly();
        this.bootstrap.releaseExternalResources();
        System.err.println("+++ SERVER - Stopped.");
    }

Handlers used: Client handler:

public class MyHandler extends SimpleChannelUpstreamHandler{
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        if(e.getMessage() instanceof String){
            System.out.println((String)e.getMessage());
        }
        System.out.println(e.getMessage().toString());
    }
}

Server handler:

@Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        Channel channel= ctx.getChannel();
        channel.write(e.getMessage());
        if(e.getMessage() instanceof String){
            System.out.println((String)e.getMessage());
        }
        System.out.println(e.getMessage().toString());
    }

client runner:

public static void main(String[] args) throws InterruptedException {

        final int nMessages = 5;

        try {
            Client c = new Client();

            if (!c.start()) {
                return;
            }

            for (int i = 0; i < nMessages; i++) {

                Thread.sleep(1L);
                c.sendMessage((i + 1) + "\n");
            }
            c.stop();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Server Runner:

public static void main(String[] args) {
        final Server s = new Server("server1");

        if (!s.start()) {
            return;
        }

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                s.stop();
            }
        });
    }

now what I really need is to print the message that I wrote on the channel on both client and server side and I am really puzzled on this.

Upvotes: 1

Views: 1821

Answers (1)

Abe
Abe

Reputation: 9031

Your pipeline creation seems to be wrong at first look. At server side when decoding, the Delimiter needs to come first, then the StringDecoder and then the business handler. You could resolve this probably by just putting breakpoints in these decoders and encoders. Also take a look at this link for very good documentation on how this works.

Upvotes: 1

Related Questions