Reputation: 576
We have a distributed application on akka cluster. Actor “A” sends message of large size to a remote actor. And we get the following warning:
2016-08-10 23:08:29,737 [EndpointWriter] ERROR - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://[email protected]:51665/temp/$b]: max allowed size 128000 bytes, actual size of encoded class common.data.model.configuration.UserList was 571444 bytes.
We are aware that we can increase the value in the configuration. But we wanted to check if the size exceeds the default limit we want to send a different message. Tried searching but no luck most of them only tell on how to configure it no one talks about on how to handle it and send the remote machine a message. Any suggestion or help would be appreciated.
Upvotes: 2
Views: 965
Reputation: 289
just got paged with this alert on our flink, the solution you should be looking for is to adjust the flink property akka.framesize
to the desired size.
In my case, I had the error message which said the following
{"name":"org.apache.pekko.remote.EndpointWriter","level":"ERROR","thread":"flink-pekko.actor.default-dispatcher-16","msg":"Transient association error (association remains live)","time":"2025-02-12T05:41:26.894Z","stack":"o.a.p.r.OversizedPayloadException: Discarding oversized payload sent to Actor[pekko.tcp://[email protected]:6122/user/rpc/taskmanager_0#986937753]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 29182768 bytes.\n"}"
hence i had to increase the property to above the 29182768 bytes value and in my case have updated it to 60000000 bytes to be on safe side
Upvotes: 0
Reputation: 4249
Actually, we are able to handle it, just need to subscribe for akka.event.Logging.Error
, and check, if the cause is OversizedPayloadException
:
import akka.actor.{Actor, Props}
class Listener extends Actor {
override def receive: Receive = {
case akka.event.Logging.Error(cause, _, _, _)
// this internal Akka class is package-private, hence we have to check it's classname :(
if cause.getClass.getName == "akka.remote.OversizedPayloadException" =>
// handle it here!
}
}
val listener = system.actorOf(Props(new Listener))
system.eventStream.subscribe(listener, classOf[akka.event.Logging.Error])
Upvotes: 1
Reputation: 576
Posted the same query to Akka mailing group. They said like there is no option of handeling this "OversizedPayloadException". Thy suggest that we should check the data size and handel it before we return to the actor.
Upvotes: 2
Reputation: 12986
Try to subscribe to the event stream. Hopefully that exception will end there:
import akka.actor.{Actor, Props}
import akka.remote.OversizedPayloadException
class Listener extends Actor {
def receive = {
case d: OversizedPayloadException => {
// DO SOMETHING
}
}
}
val listener = system.actorOf(Props(classOf[Listener], this))
system.eventStream.subscribe(listener, classOf[OversizedPayloadException])
More info here
Upvotes: 0