Reputation: 37
New query: I am trying to pass DSum() as parameter to RemoteActor from localActor, DSum() will do some calculation at Remote node. I am unable to send this to RemoteActor. IS it possible ??(code below)
Done:I am trying to connect Remote actor and local actor, and trying to send objects using case class, but it is unable to get the Message class ( Common.Message(msg) ) of the RemoteActor when being called from localActor, instead it is getting "case _ => println("Received unknown msg from local ")"
1.package.scala
package object check {
trait Context
case object Start
case class Message(msg: String)
case class CxtDA(cxtA: List[CxtA])
case class RCxt(var cxtA: List[CxtA], var cxtB: List[CxtB], var v1: Int, var v2: String) extends Context
case class CxtA(var cxtC: List[CxtC], var v1: Int) extends Context
case class CxtB(var cxtC: List[CxtC], var v1: Int) extends Context
case class CxtC(var v1: String, var v2: Int) extends Context
case class Task(var t1: DSum())
}
2. Remote Actor
package com.akka.remote
import java.io.File
import akka.actor._
import com.typesafe.config.ConfigFactory
import check._
/**
* Remote actor which listens on port 5150
*/
class RemoteActor extends Actor {
override def toString: String = {
return "You printed the Local";
}
def receive = {
case msg: String => {
println("remote received " + msg + " from " + sender)
sender ! "hi"
}
case Message(msg) =>
println("RemoteActor received message "+ msg)
sender ! Message("Hello from server")
case CxtDA(cxtA) =>
println("cxtA "+ cxtA)
case Task(taskA) =>
println ("recieved closure")
case _ => println("unknown msg")
}
}
object RemoteActor{
def main(args: Array[String]) {
//get the configuration file from classpath
val configFile = getClass.getClassLoader.getResource("remote_application.conf").getFile
// //parse the config
val config = ConfigFactory.parseFile(new File(configFile))
// //create an actor system with that config
val system = ActorSystem("RemoteSystem" , config)
// //create a remote actor from actorSystem
val remoteActor = system.actorOf(Props[RemoteActor], name="remote")
println("remote is ready")
remoteActor ! Message("Hello from active remote")
}
}
3.Local Actor
package com.akka.local
import java.io.File
import akka.actor.{Props, Actor, ActorSystem}
import com.typesafe.config.ConfigFactory
import check._
import scala.util.Random
/**
* Local actor which listens on any free port
*/
trait CxtTask {
type CxtT <: Context
def work(ctx: CxtT): CxtT
}
class DSum extends CxtTask with Serializable{
override type CxtT = CxtA
def work(ctx: CxtA): CxtA = {
val sum = ctx.cxtC.foldRight(0)((v, acc) => v.v2 + acc)
ctx.cxtC= List()
ctx.v1 = sum
println("ctx: " + ctx)
ctx
}
}
class LocalActor extends Actor{
// import Common._
@throws[Exception](classOf[Exception])
val remoteActor = context.actorSelection("akka.tcp://[email protected]:5150/user/remote")
println("That 's remote:" + remoteActor)
remoteActor ! "hi"
var counter = 0
override def toString: String = {
return "You printed the Local";
}
def receive = {
case msg:String => {
println("got message from remote" + msg)
}
case Start =>
println("inside Start.local "+ remoteActor)
remoteActor ! Message("Hello from the LocalActor")
case Message(msg) =>
println("LocalActor received message: "+ msg)
if (counter < 5) {
sender ! Message("Hello back to you")
counter += 1
}
case CxtDA(cxtA) =>
remoteActor ! CxtDA(cxtA)
case Task(t1) =>
remoteActor ! Task(t1)
}
}
object LocalActor {
def main(args: Array[String]) {
val configFile = getClass.getClassLoader.getResource("local_application.conf").getFile
val config = ConfigFactory.parseFile(new File(configFile))
val system = ActorSystem("ClientSystem",config)
val localActor = system.actorOf(Props[LocalActor], name="local")
localActor ! Start
def createRndCxtC(count: Int):List[CxtC] = (for (i <- 1 to count) yield CxtC(Random.nextString(5), 3)).toList
def createRndCxtB(count: Int): List[CxtB] = (for (i <- 1 to count) yield CxtB(createRndCxtC(count), Random.nextInt())).toList
def createRndCxtA(count: Int): List[CxtA] = (for (i <- 1 to count) yield CxtA(createRndCxtC(count), Random.nextInt())).toList
val tree = RCxt(createRndCxtA(2),createRndCxtB(2),1,"")
val workA = new DSum()
tree.cxtA.foreach(ctxa =>workA.work(ctxa))
localActor ! Task(new DSum())
}
}
[Remote actor output][1]
[1]: https://i.sstatic.net/mtmvU.jpg
Upvotes: 0
Views: 230
Reputation: 807
The key thing here is that you have defined two different protocols for each actor:
Common
object that resides in the RemoteActor.scala
fileCommon
object that resides in the LocalActor.scala
file Hence, when sending a Common.Message
within the Local Actor, you are basically creating a message with a different type than the Common.Message
from the Remote Actor. Hence, the actor is not able to process it.
As a good practice in Akka, whenever an actor has a specific message procotol, that should be defined in its companion object. However, if you have multiple actors that share the same protocol (their behavior is defined by processing those types of messages), then you should put that protocol in an object and import it from your actors.
I hope this is helpful.
Upvotes: 1