biiyamn
biiyamn

Reputation: 506

How to Perform Asynchronous Operations in Netty's channelRead Method Without Immediate Pipeline Propagation

I am developing an application using Netty and need to perform an asynchronous operation within the channelRead method of a handler. My goal is to make this async call without immediately propagating the message through the pipeline. Instead, I want to continue processing the pipeline only after the async call completes and I have the result.

Can anyone provide an example or best practices on how to achieve this? Let's say the pipeline is as follows: CryptoHandler -> LoggingHandler -> ServerHandler. The CryptoHandler should decrypt the message using an exposed HTTP crypto service, which is a blocking operation. Is it possible to do this asynchronously (i.e., in another thread) without blocking the handler and just propagate the Netty event to the next handler once the result is received? For example, as shown below:

public class CryptoHandler extends ChannelInboundHandlerAdapter implements HandlerListener{
private Map<String,ChannelHandlerContext> cache=new HashMap<>();
private DecryptionService decryptionService;
private Executor executor= Executors.newFixedThreadPool(10);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("CryptoHandler.channelRead");
    cache.put(msg.hashCode()+"",ctx);
    executor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                decryptionService.decrypt(msg);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
    // ctx.fireChannelRead(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("CryptoHandler.channelReadComplete");
    // ctx.fireChannelReadComplete();
}

@Override
public void onMessage(Object msg) {
    System.out.println("Receiving decrypted message");
    cache.get(msg.hashCode()+"").pipeline().context(LoggingHandler.class).fireChannelRead(msg);
    cache.get(msg.hashCode()+"").pipeline().context(LoggingHandler.class).fireChannelReadComplete();

}

public void setDecryptionService(DecryptionService decryptionService) {
    this.decryptionService = decryptionService;
}}

public class DecryptionService {
private final  HandlerListener listener;

public DecryptionService(HandlerListener listener) {
    this.listener = listener;
}
public void decrypt(Object msg) throws InterruptedException {
    System.out.println("start message decryption ");;
    Thread.sleep(200);// simulate Http decryption service
    System.out.println("decryption is done");
    listener.onMessage(msg);
}

}

Upvotes: 1

Views: 50

Answers (0)

Related Questions