Reputation: 1341
I have an asynchronous method in my client that sends a udp request to a server and returns a Promise. I need to pass this Promise somehow to one of my inbound handlers so that it will be able to mark it as "done" then send back the response through Promise.setSuccess(result).
How do you do this exactly? Once a response is received by an inbound handler, how do you associate the request instance to the response from the Handler?
Some of the approaches suggested in this site also didn't work out for me:
My Code:
Client:
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private Channel channel;
private BlockingQueue<GameQuery> requestQueue;
public SourceServerQueryClient() {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
configureBootstrap(bootstrap);
try {
channel = bootstrap.bind(0).sync().channel();
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
public void configureBootstrap(Bootstrap bootstrap) {
//Contains our request queue
requestQueue = new ArrayBlockingQueue<>(50);
//Configure our bootstrap
bootstrap.group(group).channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ErrorHandler());
pipeline.addLast(new SourcePacketVerifierHandler());
pipeline.addLast(new SourceQueryEncoder());
pipeline.addLast(new MasterServerDecoder());
pipeline.addLast(new SourceServerInfoDecoder());
pipeline.addLast(new QueryResponseHandler(requestQueue));
}
});
}
public Promise<SourceServer> getServerDetails(InetSocketAddress address, QueryCallback<SourceServer> callback) {
Promise<SourceServer> p = sendQuery(new ServerInfoQuery(address));
p.addListener(future -> {
if (future.isSuccess())
callback.onReceive(p.get());
});
return p;
}
private Promise sendQuery(GameQuery query) {
Promise promise = channel.eventLoop().newPromise();
query.setPromise(promise);
ChannelFuture f = null;
try {
requestQueue.put(query);
f = channel.writeAndFlush(query).sync();
} catch (InterruptedException e) {
if (f != null)
promise.setFailure(f.cause());
}
return promise;
}
public static void main(String[] args) throws InterruptedException {
try (SourceServerQueryClient client = new SourceServerQueryClient()) {
Promise query1 = client.getServerDetails(new InetSocketAddress("169.38.68.44", 27015), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 169.38.68.44:27015", msg.toString()));
Promise query2 = client.getServerDetails(new InetSocketAddress("112.211.234.23", 27016), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 112.211.234.23:27016", msg.toString()));
query2.awaitUninterruptibly();
log.info("Done");
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
Inbound Handler:
public class QueryResponseHandler<T extends GameQuery> extends SimpleChannelInboundHandler<Object> {
private static final Logger log = LoggerFactory.getLogger(QueryResponseHandler.class);
private BlockingQueue<T> requestQueue;
public QueryResponseHandler(BlockingQueue<T> requestQueue) {
this.requestQueue = requestQueue;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("From QueryResponseHandler: {}", msg);
T request = requestQueue.poll();
Promise p = request.getPromise();
if (request != null) {
p.setSuccess(msg);
} else
p.setFailure(new BufferUnderflowException());
}
}
In my test, I ran two requests at the same time. The first should not work since its a dead server. The second call should send back a response.
Output:
REPLY FROM SERVER: 112.211.234.23:27016, EXPECTED: 169.38.68.44:27015
As you can see, it didn't work out as expected due to the design. The first query received the response that was intended for the second query.
I'm running out of ideas how to design this properly, so any inputs would be greatly appreciated! Thanks.
Upvotes: 0
Views: 1951
Reputation: 5387
As UDP is stateless, you need to provide a message ID to be able to associate and follow up answer to a request.
Although if you want to have a stateful communication, why not simply using TCP?
Upvotes: 0
Reputation: 2216
Maybe adding an "id" of the request such that when you "poll" it, you can indeed get the right one (so through a sort of map of queues and not only a single queue) ?
This "id" could be based on the output message you show (maybe not ?).
WDYT?
Upvotes: 0