Reputation: 2870
My web server (custom built on top of Netty) uses a web client (also custom built with Netty) to make proxied requests to S3.
Client -> Webserver|Webclient -> S3
The purpose of the system is to pipe file uploads directly to S3 with a little bit of logic:
Webserver
accepts client request (POST);Client
channel readability to false and verifies a bunch of stuff;Webclient
to connect to S3
;Webclient
connects to S3
:
Client
channel readability to trueWebserver
are handed over to the Webclient
to forward.In the (highly unlikely) event that the connection between Client
and Webserver
is faster than the connection between Webclient
and S3
, I need to throttle the connection between Client
and Webserver
.
The approach I took was simply keep a counter of bytes received by the Webserver
(which increments every time Client
sends data) and that decrements every time that a write of Webclient
completes. Whenever the amount of data on this buffer goes over a given threshold, the Client
's channel readability is set to false
.
This works great until I add an OrderedMemoryAwareThreadPoolExecutor
to the server's pipeline.
A simple solution is to use an OioClientSocketChannelFactory
on the Webclient
. This causes the calls to Channel.write
to be blocking, so when messageReceived()
is called on the Webserver
's handler — and, consequently Channel.write
is called on the Webclient
— throttling happens "naturally".
However, if I use a NioClientSocketChannelFactory
on the Webclient
, then calls to Channel.write
become asynchronous and throttling stops working.
Basically what I'm noticing here is that Channel.setReadability(false)
seems to bear no effect when an OrderedMemoryAwareThreadPoolExecutor
is inserted into the pipeline.
How can I perform throttling using OMATPE in the pipeline?
Upvotes: 1
Views: 639
Reputation: 12351
As Jestan said, please refer to org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler
in the master. Also, you will have to configure your receiveBufferSizePredictorFactory
so that it does not return too large value. Otherwise, Netty will just allocate a large buffer and fill it really quickly. Use AdaptiveReceiveBufferSizePredictorFactory
with a smaller maximum in combination with ChannelTrafficShapingHandler
.
Upvotes: 2
Reputation: 2486
1) OrderedMemoryAwareThreadPoolExecutor also monitors the channel memory (in your case received data size) and suspend/enable reading when it above/below the configured max size (through OrderedMemoryAwareThreadPoolExecutor constructor).
2) When its used with an ExecutionHandler, the handler may discards channel state events, if some attachment found in the context (But that context attachment is usually set by OrderedMemoryAwareThreadPoolExecutor, to not allow above upstream handlers to change the channel state and cause OutofMemoryException ).
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return;
}
I think, you have to configure min, max channel memory size of OMATPE, or you may have an context attachment leads to this situation?
Upvotes: 2