Reputation: 419
Following recommendations elsewhere I am attempting to parallelize my final inbound handler in a Netty pipeline as such
public final class EchoServer {
private EventLoopGroup group = new NioEventLoopGroup();
private UnorderedThreadPoolEventExecutor workers = new UnorderedThreadPoolEventExecutor(10);
public void start(int port) throws InterruptedException {
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel channel) throws Exception {
channel.pipeline().addLast(workers, new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
// Simulated database delay that I have to wait to occur before repsonding
Thread.sleep(1000);
ctx.write(new DatagramPacket(Unpooled.copiedBuffer("goodbye", StandardCharsets.ISO_8859_1), packet.sender()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
}
});
}
});
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public void stop() {
group.shutdownGracefully();
}
}
I have ten clients that connect concurrently, as a test, and I am measuring execution time for handling all the requests. As expected with the 1 second delay and sequential execution it takes just over 10 seconds. I am trying to get execution down to somewhere sub 2 seconds to prove parallel handling.
From what I understand adding the handler to the pipeline with an explicitly assigned executor is supposed to parallelize that handlers work across the thread in the executor.
Instead of seeing a increase in performance, what I am finding is that my client is not receiving the responses when I add the parallel processing. The thread sleep is there to simulate the potential time it will take to write the incoming data to a database. Am I doing something obviously wrong here?
Upvotes: 2
Views: 632
Reputation: 419
I worked around the apparently lack of Netty support for doing final end UDP processing in parallel using standard java concurrency mechanisms.
public final class EchoServer {
private EventLoopGroup group = new NioEventLoopGroup();
private ExecutorService executors = Executors.newFixedThreadPool(10);
public void start(int port) throws InterruptedException {
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class).handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel channel) throws Exception {
channel.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
CompletableFuture.runAsync(() -> {
System.err.println(packet);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("goodbye", StandardCharsets.ISO_8859_1),
packet.sender()));
}, executors);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
}
});
}
});
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public void stop() {
group.shutdownGracefully();
}
}
Upvotes: 1