Reputation: 609
Scheduling a job using Akka Scheduler goes something like this (at least from the documentation):
system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());
I don't however understand how I can ensure the next run only happens when current run is complete. I've been looking around with no success :(
Upvotes: 0
Views: 625
Reputation: 19527
A scheduler is the wrong tool for your use case.
An alternative is Akka Stream's Sink.actorRefWithAck
(the code below is adapted from the example in the linked documentation and borrows the utility classes defined there). You would need to adjust the worker actor to handle a few messages related to the status of the stream and to reply with an acknowledgement message. The acknowledgement message functions as a backpressure signal and indicates that the actor is ready to process the next MessageToTheActor
message. The worker actor would look something like the following:
enum Ack {
INSTANCE;
}
static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) { this.cause = cause; }
public Throwable getCause() { return cause; }
}
public class MyWorker extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log().info("Stream initialized");
sender().tell(Ack.INSTANCE, self());
})
.match(MessageToTheActor.class, msg -> {
log().info("Received message: {}", msg);
// do something with the message...
sender().tell(Ack.INSTANCE, self());
})
.match(StreamCompleted.class, completed -> {
log().info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log().error(failed.getCause(),"Stream failed!");
})
.build();
}
}
To use Sink.actorRefWithAck
with the above actor:
final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);
ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));
Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());
Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
workerActor,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex)
);
messages.runWith(sink, materializer);
Note the use of Source.repeat
, which in this case continually emits a MessageToTheActor
message. Using Sink.actorRefWithAck
ensures that the actor doesn't receive another message until it's done processing the current message.
The following imports are required (obviously, so is the Akka Streams dependency):
import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
Upvotes: 2
Reputation: 31644
system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());
Above code means every 5 seconds, scheduler will send message
to the actor workerActor
.
And as you know, actor default underlying with just one thread (unless you configure with nr-of-instance>1), that means all your message sent to workActor
will be buffered in mailbox
, because just one thread can call receive
function of actor
.
In another word, you can always ensure the next run only happens when current run is complete as just one thread worked for the actor by default at the same time.
Upvotes: 0
Reputation: 2000
Scheduler means that you want something in periodic manner, now if your second
run is dependent on your first
run, then why do you even want to create a scheduler.
Just create two actors, one manager actor
and other a child actor
.
When the task is success
, the child actor
sends a success message to parent actor
, so then parent actor asks the child actor
to then run the task second time. This guarantees that tasks are run in periodic order and also when the previous one was successful.
So basically, you have to implement corresponding matching case class in your receive methods of your actors
.
Hope this helps!
Upvotes: 1