Reputation: 506
I am developing an application using Netty and need to perform an asynchronous operation within the channelRead method of a handler. My goal is to make this async call without immediately propagating the message through the pipeline. Instead, I want to continue processing the pipeline only after the async call completes and I have the result.
Can anyone provide an example or best practices on how to achieve this? Let's say the pipeline is as follows: CryptoHandler -> LoggingHandler -> ServerHandler. The CryptoHandler should decrypt the message using an exposed HTTP crypto service, which is a blocking operation. Is it possible to do this asynchronously (i.e., in another thread) without blocking the handler and just propagate the Netty event to the next handler once the result is received? For example, as shown below:
public class CryptoHandler extends ChannelInboundHandlerAdapter implements HandlerListener{
private Map<String,ChannelHandlerContext> cache=new HashMap<>();
private DecryptionService decryptionService;
private Executor executor= Executors.newFixedThreadPool(10);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("CryptoHandler.channelRead");
cache.put(msg.hashCode()+"",ctx);
executor.execute(new Runnable() {
@Override
public void run() {
try {
decryptionService.decrypt(msg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("CryptoHandler.channelReadComplete");
// ctx.fireChannelReadComplete();
}
@Override
public void onMessage(Object msg) {
System.out.println("Receiving decrypted message");
cache.get(msg.hashCode()+"").pipeline().context(LoggingHandler.class).fireChannelRead(msg);
cache.get(msg.hashCode()+"").pipeline().context(LoggingHandler.class).fireChannelReadComplete();
}
public void setDecryptionService(DecryptionService decryptionService) {
this.decryptionService = decryptionService;
}}
public class DecryptionService {
private final HandlerListener listener;
public DecryptionService(HandlerListener listener) {
this.listener = listener;
}
public void decrypt(Object msg) throws InterruptedException {
System.out.println("start message decryption ");;
Thread.sleep(200);// simulate Http decryption service
System.out.println("decryption is done");
listener.onMessage(msg);
}
}
Upvotes: 1
Views: 50