smeeb
smeeb

Reputation: 29477

How to make blocking/synchronous calls into an Akka actor system?

Akka 2.4.1 Java API here. I don't have the bandwidth right now to learn Scala, so I would ask that code examples here also use the Java API.

I have an existing ActorSystem that is chock full of asynchronous actors and which works beautifully. I now have a need to reuse this actor system in a synchronous context like so:

// Groovy pseudo-code; NOT inside the actor system here!
ComputationRequest request = new ComputationRequest(1, 7, true)
MasterActor master = actorSystem.get(...)
ComputationResult result = actorSystem.tell(master, request)

Nowhere in Akka's documentation do I see a clear example of sending equests into an actor system (from the outside) and then retrieving the results. Could I use Futures here perhaps? What's the standard way of handling this pattern in Akka (code samples)?

Upvotes: 3

Views: 8257

Answers (2)

ya_pulser
ya_pulser

Reputation: 2670

You can inject a callback, wrapped into an actor, into akka system and use it as a "sender" with tell mechanics. Full example:

import akka.actor.*;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.pattern.Patterns;
import akka.util.Timeout;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class Main {

    public static void main(String[] args) throws Exception {

        // init your akka system
        final ActorSystem system = ActorSystem.create("Cambria");
        final ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
        worker.tell(new Work(10));

        // make callback
        final Consumer<Object> callback = a -> System.out.println("from the other side: " + a);

        // wrap call back into sub-actor
        class TheSpy extends UntypedActor {
            @Override
            public void onReceive(final Object message) throws Exception {
                callback.accept(message);
            }
        }

        // inject callback into the system
        final ActorRef theSpy = system.actorOf(new Props(TheSpy::new), "spy");

        // find actor you want to hack
        final ActorSelection found = system.actorSelection("/user/worker");
        // send it a message and observe using callback)
        found.tell(new Work(20), theSpy);

        final Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
        final Future<Object> future = Patterns.ask(worker, new Work(30), timeout);
        final Work result = (Work) Await.result(future, timeout.duration());
        System.out.println(result);

        system.shutdown();
        system.awaitTermination();
    }
}

public class Worker extends UntypedActor {
    public void onReceive(Object message) {
        if (message instanceof Work) {
            Work work = (Work) message;
            System.out.println("work = " + work);
            getSender().tell(new Work(work.getStart() + 1));
        } else {
            unhandled(message);
        }
    }
}

public class Work {
    private final int start;

    public Work(int start) {
        this.start = start;
    }

    public int getStart() {
        return start;
    }

    @Override
    public String toString() {
        return "Work{" +
                "start=" + start +
                '}';
    }
}

Upvotes: 0

knutwalker
knutwalker

Reputation: 5974

There is the ask pattern does what you want. It expects the target actor to "return" a response by telling it to getSender(). You'll get a Future for the response and can work with that (blocking, if you must).

import akka.dispatch.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;


ComputationRequest request = new ComputationRequest(1, 7, true);
MasterActor master = actorSystem.get(...);

Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(master, request, timeout);
ComputationResult result = (ComputationResult) Await.result(future, timeout.duration());

Upvotes: 12

Related Questions