Reputation: 7106
I'm trying to implement a UDP server with Netty. The idea is to bind only once (therefore creating only one Channel
). This Channel
is initialized with only one handler that dispatches processing of incoming datagrams among multiple threads via an ExecutorService
.
@Configuration
public class SpringConfig {
@Autowired
private Dispatcher dispatcher;
private String host;
private int port;
@Bean
public Bootstrap bootstrap() throws Exception {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(dispatcher);
ChannelFuture future = bootstrap.bind(host, port).await();
if(!future.isSuccess())
throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());
return bootstrap;
}
}
@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {
private int workerThreads;
private ExecutorService executorService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
final Channel channel = ctx.channel();
executorService.execute(new Runnable() {
@Override
public void run() {
//Process the packet and produce a response packet (below)
DatagramPacket responsePacket = ...;
ChannelFuture future;
try {
future = channel.writeAndFlush(responsePacket).await();
} catch (InterruptedException e) {
return;
}
if(!future.isSuccess())
log.warn("Failed to write response packet.");
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(workerThreads);
}
}
I have the following questions:
DatagramPacket
received by the channelRead
method of the Dispatcher
class be duplicated before being used by the worker thread? I wonder if this packet is destroyed after the channelRead
method returns, even if a reference is kept by the worker thread.Channel
among all the worker threads and let them call writeAndFlush
concurrently?Thanks!
Upvotes: 4
Views: 2546
Reputation: 40313
Nope. If you need the object to live longer you either turn it into something else or use ReferenceCountUtil.retain(datagram)
and then ReferenceCountUtil.release(datagram)
once you're done with it. You also shouldn't be doing await()
at the executor service as well, you should register a handler for whatever happens.
Yes, channel objects are thread safe and they can be called from many different threads.
Upvotes: 5