Reputation: 586
First, I'll explain the situation and the logic that I'm trying to implement:
I have multiple threads, each put result of it work, some object called Result
into queue QueueToSend
My NettyClient
runs in thread and takes Result
from QueueToSend
every 1 milisecond and should connect to server and send a message, that is created from Result
.
I also need this connections to be asynch. So I need the Result
list to be known by NettyHandler
to send right message and process right result and then again send response.
So I initialize NettyClient
bootstrap
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
and sets pipeline once when app starts.
Then, every milisecond I take Result
object from QueueToSend
and connect to server
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
I decided to use static ConcurrentHashMap
to save every result object taken from QueueToSend
assosiated with channel.
The first problem takes place in NettyHandler
in method channelConnected, when I am trying to take Result
object assosiated with channel from ResultConcurrentHashMap
.
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel channel = ctx.getPipeline.getChannel();
Result result = ResultConcurrentHashMap.get(channel.getId());
}
But sometimes result
is null (1 of 50), even thought it should be in ResultConcurrentHashMap
. I think it happens cause that channelConnected
event happens before NettyClient
runs this code:
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
May be it will not appear if I run NettyServer
and NettyClient
not on localhost both, but remotely, it will take moretime to estabilish the connection. But I need a solution for this issue.
Another issue is that I am sending messages every 1 milisecond asynchromously and I suppose that messages are may be mixed and server can not read them properly. If I run them one by one it will be ok :
future.getChannel().getCloseFuture().awaitUninterruptibly();
But I need asynchromus sending, and process right results, assosiated with channel and send responses. What should I implement?
Upvotes: 0
Views: 5608
Reputation: 2486
ChannelFutures are executed asynchronously before the events get fired. For example channel connect future will be completed before firing the channel connected event.
So you have to register a channel future listener after calling bootstrap.connect() and write your code in the listener to initialize the HashMap, then it will be visible to the handler.
ChannelFuture channelFuture = bootstrap.connect(remoteAddress, localAddress);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
resultConcurrentHashMap.put(future.getChannel().getId(), result);
}
});
Upvotes: 2