Reputation: 51
I have 3 akka nodes(akka 2.4.8 actor systems) based on remoting(NOT cluster). When a remote actor is created and performs long task(take more than 30 minutes), I will get error from remote actor system(on remote machine): The remote system has quarantined this system:
From local system: 2016-08-11 03:29:12.748UTC WARN [PLM-akka.actor.default-dispatcher-27] RemoteWatcher | akka.tcp://PLM@flowsvr02:46407/system/remote-watcher | Detected unreachable: [akka.tcp://AS1@lxsvr01g:9500] 2016-08-11 03:29:12.787UTC WARN [PLM-akka.actor.default-dispatcher-14] Remoting | akka.remote.Remoting | Association to [akka.tcp://AS1@lxsvr01g:9500] having UID [1284261532] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
From remote system: 00:41:05.169UTC WARN [AS2-akka.actor.default-dispatcher-5 ] eliableDeliverySupervisor | eEndpointWriter-akka.tcp%3A%2F%2FPLM%40flowsvr02%3A36210-0 | Association with remote system [akka.tcp://PLM@flowsvr02:36210] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 01:06:23.138UTC ERROR[AS2-akka.actor.default-dispatcher-17] EndpointWriter | /endpointWriter-akka.tcp%3A%2F%2FPLM%40ftflowsvr02%3A36210-1 | AssociationError [akka.tcp://AS2@lxsvr02g:9500] <- [akka.tcp://PLM@flowsvr02:36210]: Error [Invalid address: akka.tcp://PLM@flowsvr02:36210] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://PLM@flowsvr02:36210 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted. ]
Local side code: ...
val remoteConfig = new RemotingConfig("application.conf")
val plmRmRepo = new ResourceManagerDBHandler(config.getString("database.txs_db"))
val remotingManager: ActorRef = system.actorOf(Props(new RemotingManager(plmRmRepo, remoteConfig, system)), name="RemotingManager")
...
val rmWorker: ActorRef = createRemoteActor(request, rm)
requestActor ! ResourceResponse(request.id, request.taskType, request.originalSender, Some(rmWorker))
log.info(s"remote actor is created: " + rmWorker.toString())
...
def createRemoteActor(request: ResourceRequest, rm: ResourceManagerClass): ActorRef = {
log.info(s"RemotingManager: @" + rm.nodeName + "to create remote actor..." + request.implementation)
val delegateClass = Class.forName(request.implementation)
val remoteASAddress = Address(rm.protocol, rm.nodeName, rm.host, rm.port)
system.actorOf(Props(delegateClass).
withDeploy(Deploy(scope = RemoteScope(remoteASAddress))))
Remote side is simple, just start an actor system(of course, have all class implmenetations, same jar)
object RemoteMain extends App {
//val config = ConfigFactory.load("remotesystem.conf")
val config = ConfigFactory.load()
var remoteSystemName = config.getString("RemoteSystem.nodeName")
//create an actor system with that config
val system = ActorSystem(remoteSystemName, config)
implicit val executor = system.dispatcher
//val defaultActor = system.actorOf(Props[RemoteActorSystem], remoteConfig.className)
system.log.info("## Remote Manager Is Started ##")
}
It seems heartbeat between local master actor system and remote actor systems are not working after 30 minutes(timeout???) ? How can I solve this issue?
Thank you,
============
More update: after enabling debug for AKKA:
From local(master): 2016-08-12 19:14:32.015UTC WARN [PLM-akka.actor.default-dispatcher-5 ] RemoteWatcher | akka.tcp://PLM@flowsvr02:9888/system/remote-watcher | Detected unreachable: [akka.tcp://AS1@lxsvr01g:9500]
From remote failed node(slave): 2016-08-12 19:17:31.707UTC WARN [AS1-akka.actor.default-dispatcher-21] eliableDeliverySupervisor | leEndpointWriter-akka.tcp%3A%2F%2FPLM%40flowsvr02%3A9888-0 | Association with remote system [akka.tcp://PLM@flowsvr02:9888] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
application.conf:
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = DEBUG
#logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
#log-config-on-start = on
log-dead-letters = 10
log-dead-letters-during-shutdown = on
logger-startup-timeout = 30s
actor {
serializers {
akka-containers = "akka.remote.serialization.MessageContainerSerializer"
akka-misc = "akka.remote.serialization.MiscMessageSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
}
serialization-bindings {
"akka.actor.ActorSelectionMessage" = akka-containers
# The classes akka.actor.Identify and akka.actor.ActorIdentity serialization/deserialization are required by
# the cluster client to work.
# For the purpose of preserving protocol backward compatibility, akka.actor.Identify and akka.actor.ActorIdentity
# are stil using java serialization by default.
# Should java serialization be disabled, uncomment the following lines
# "akka.actor.Identify" = akka-misc
# "akka.actor.ActorIdentity" = akka-misc
# Should java serialization be disabled, uncomment the following lines
# "scala.Some" = akka-misc
# "scala.None$" = akka-misc
"akka.remote.DaemonMsgCreate" = daemon-create
# Since akka.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
"akka.protobuf.GeneratedMessage" = proto
# Since com.google.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
# This com.google.protobuf serialization binding is only used if the class can be loaded,
# i.e. com.google.protobuf dependency has been added in the application project.
"com.google.protobuf.GeneratedMessage" = proto
}
serialization-identifiers {
"akka.remote.serialization.ProtobufSerializer" = 2
"akka.remote.serialization.DaemonMsgCreateSerializer" = 3
"akka.remote.serialization.MessageContainerSerializer" = 6
"akka.remote.serialization.MiscMessageSerializer" = 16
}
debug {
receive = on
# enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.)
autoreceive = on
# enable DEBUG logging of actor lifecycle changes
lifecycle = on
# enable DEBUG logging of unhandled messages
unhandled = on
}
warn-about-java-serializer-usage = false
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
# If this is "on", Akka will log all outbound messages at DEBUG level
log-sent-messages = on
# If this is "on", Akka will log all inbound messages at DEBUG level
log-received-messages = on
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "ftflowsvr02"
port = 9888
tcp-keepalive = on
}
transport-failure-detector {
implementation-class = "akka.remote.DeadlineFailureDetector"
heartbeat-interval = 5 s
acceptable-heartbeat-pause = 300 s
}
watch-failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a public constructor with a com.typesafe.config.Config and
# akka.actor.EventStream parameter.
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 5 s
# Defines the failure detector threshold.
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes.
threshold = 300.0
# Number of the samples of inter-heartbeat arrival times to adaptively
# calculate the failure timeout for connections.
max-sample-size = 200
# Minimum standard deviation to use for the normal distribution in
# AccrualFailureDetector. Too low standard deviation might result in
# too much sensitivity for sudden, but normal, deviations in heartbeat
# inter arrival times.
min-std-deviation = 100 ms
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 300 s
# How often to check for nodes marked as unreachable by the failure
# detector
unreachable-nodes-reaper-interval = 5s
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat mesage has
# been received.
expected-response-after = 5 s
}
retry-gate-closed-for = 60 s
quarantine-after-silence = 5 d
resend-interval = 5 s
resend-limit = 200
default-remote-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
parallelism-max = 2
}
}
}
}
Upvotes: 1
Views: 2409