okello
okello

Reputation: 609

Akka Scheduler: Run next only when current run is complete

Scheduling a job using Akka Scheduler goes something like this (at least from the documentation):

system.scheduler().schedule(
    Duration.Zero(),
    Duration.create(5, TimeUnit.SECONDS),
    workerActor,
    new MessageToTheActor(),
    system.dispatcher(), ActorRef.noSender());

I don't however understand how I can ensure the next run only happens when current run is complete. I've been looking around with no success :(

Upvotes: 0

Views: 625

Answers (3)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

A scheduler is the wrong tool for your use case.

An alternative is Akka Stream's Sink.actorRefWithAck (the code below is adapted from the example in the linked documentation and borrows the utility classes defined there). You would need to adjust the worker actor to handle a few messages related to the status of the stream and to reply with an acknowledgement message. The acknowledgement message functions as a backpressure signal and indicates that the actor is ready to process the next MessageToTheActor message. The worker actor would look something like the following:

enum Ack {
  INSTANCE;
}

static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
  private final Throwable cause;
  public StreamFailure(Throwable cause) { this.cause = cause; }

  public Throwable getCause() { return cause; }
}

public class MyWorker extends AbstractLoggingActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(StreamInitialized.class, init -> {
        log().info("Stream initialized");
        sender().tell(Ack.INSTANCE, self());
      })
      .match(MessageToTheActor.class, msg -> {
        log().info("Received message: {}", msg);
        // do something with the message...
        sender().tell(Ack.INSTANCE, self());
      })
      .match(StreamCompleted.class, completed -> {
        log().info("Stream completed");
      })
      .match(StreamFailure.class, failed -> {
        log().error(failed.getCause(),"Stream failed!");
      })
      .build();
  }
}

To use Sink.actorRefWithAck with the above actor:

final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);

ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));

Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());

Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
  workerActor,
  new StreamInitialized(),
  Ack.INSTANCE,
  new StreamCompleted(),
  ex -> new StreamFailure(ex)
);

messages.runWith(sink, materializer);

Note the use of Source.repeat, which in this case continually emits a MessageToTheActor message. Using Sink.actorRefWithAck ensures that the actor doesn't receive another message until it's done processing the current message.

The following imports are required (obviously, so is the Akka Streams dependency):

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;

Upvotes: 2

atline
atline

Reputation: 31644

system.scheduler().schedule(
    Duration.Zero(),
    Duration.create(5, TimeUnit.SECONDS),
    workerActor,
    new MessageToTheActor(),
    system.dispatcher(), ActorRef.noSender());

Above code means every 5 seconds, scheduler will send message to the actor workerActor.

And as you know, actor default underlying with just one thread (unless you configure with nr-of-instance>1), that means all your message sent to workActor will be buffered in mailbox, because just one thread can call receive function of actor.

In another word, you can always ensure the next run only happens when current run is complete as just one thread worked for the actor by default at the same time.

Upvotes: 0

zenwraight
zenwraight

Reputation: 2000

Scheduler means that you want something in periodic manner, now if your second run is dependent on your first run, then why do you even want to create a scheduler.

Just create two actors, one manager actor and other a child actor.

When the task is success, the child actor sends a success message to parent actor, so then parent actor asks the child actor to then run the task second time. This guarantees that tasks are run in periodic order and also when the previous one was successful.

So basically, you have to implement corresponding matching case class in your receive methods of your actors.

Hope this helps!

Upvotes: 1

Related Questions