Prog_G
Prog_G

Reputation: 1615

Akka Cluster starding not able to register to Coordinator

I am trying to create an Akka Sharding Cluster. I want to use proxy only mode on one of the nodes just to route the message to the shard regions. I am getting the following Warning:

[WARN] [02/11/2019 17:04:17.819] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://[email protected]:2555/system/sharding/ShardnameProxy] Trying to register to coordinator at [Some(ActorSelection[Anchor(akka.tcp://[email protected]:2551/), Path(/system/sharding/ShardnameCoordinator/singleton/coordinator)])], but no acknowledgement. Total [1] buffered messages.

**Main.java: ** Starts the cluster using the configuration from application.conf(code added latter)

object Main {
  val shardName = "Shardname"
  val role = "Master"
  var shardingProbeLocalRegin: Option[ActorRef] = None
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load()
    val system = ActorSystem("ClusterSystem",conf.getConfig("main"))
    ClusterSharding(system).start(shardName,Test.props,ClusterShardingSettings(system),ShardDetails.extractEntityId,ShardDetails.extractShardId)
  }
}

Test.java : Entity for the Sharding Cluster

object Test {
  def props: Props = Props(classOf[Test])

  class Test extends Actor {
    val log = Logger.getLogger(getClass.getName)


    override def receive = {
      case msg: String =>
        log.info("Message from " + sender().path.toString + " Message is " + msg)
        sender() ! "Done"

    }
  }

}

MessageProducer.java(Proxy Only Mode) Message Producer sends a message to the Shard every second.

object MessageProducer {

  var shardingProbeLocalRegin: Option[ActorRef] = None
  object DoSharding
  def prop:Props = Props(classOf[MessageProducer])
  var numeric : Long = 0
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load
    val system = ActorSystem("ClusterSystem",conf.getConfig("messgaeProducer"))
    ClusterSharding(system).startProxy(Main.shardName,None,extractEntityId,extractShardId)
    shardingProbeLocalRegin  = Some(ClusterSharding(system).shardRegion(Main.shardName))
    val actor = system.actorOf(Props[MessageProducer],"message")
  }
}

class RemoteAddressExtensionImpl(system: ExtendedActorSystem) extends Extension {
  def address = system.provider.getDefaultAddress
}

object RemoteAddressExtension extends ExtensionKey[RemoteAddressExtensionImpl]

class MessageProducer extends Actor{
  val log = Logger.getLogger(getClass.getName)


  override def preStart(): Unit = {
    println("Starting "+self.path.address)
    context.system.scheduler.schedule(10 seconds,1 second ,self,DoSharding)
  }


  override def receive = {
    case DoSharding =>
      log.info("sending message" + MessageProducer.numeric)
      MessageProducer.shardingProbeLocalRegin.foreach(_ ! "" + (MessageProducer.numeric))
      MessageProducer.numeric += 1

  }

}

**application.conf: ** Configuration File

    main {
      akka {
        actor {
          provider = "akka.cluster.ClusterActorRefProvider"
        }

        remote {
          log-remote-lifecycle-events = on

          netty.tcp {
            hostname = "127.0.0.1"
            port = 2551
          }

        }

        cluster {
          seed-nodes = [
            "akka.tcp://[email protected]:2551"
          ]

          sharding.state-store-mode = ddata
          auto-down-unreachable-after = 1s
        }

        akka.extensions = ["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.ddata.DistributedData"]

      }
    }
messgaeProducer {
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
      log-remote-lifecycle-events = on

      netty.tcp {
        hostname = "192.168.2.96"
        port = 2554
      }

    }

    cluster {
      seed-nodes = [
        "akka.tcp://[email protected]:2551"
        //, "akka.tcp://[email protected]:2552"
      ]

      sharding.state-store-mode = ddata
      auto-down-unreachable-after = 1s
    }

    akka.extensions = ["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.ddata.DistributedData"]


  }
}

Am I doing anything wrong? Is there any other way to apply for this approach. My main aim is to avoid Single Point of failure for my cluster. If any node goes down then it should not affect any other state. Can anyone help me with this?

Upvotes: 3

Views: 2049

Answers (1)

i na
i na

Reputation: 21

Is it solved? If not, please check your akka.cluster configuration. You have to set config like this. It works to me

for proxy

akka.cluster {
  roles = ["Proxy"]
  sharding {
    role = "Master"
  } 
}

for master

akka.cluster {
  roles = ["Master"]
  sharding {
    role = "Master"
  } 
}

Upvotes: 2

Related Questions