Reputation: 572
I am trying to implement a customized Akka Sink, but I could not find a way to handle future inside it.
class EventSink(...) {
val in: Inlet[EventEnvelope2] = Inlet("EventSink")
override val shape: SinkShape[EventEnvelope2] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
// This requests one element at the Sink startup.
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
Await.ready(future, Duration.Inf)
/*
future.onComplete {
case Success(_) =>
logger.info("pulling next events")
pull(in)
case Failure(failure) =>
logger.error(failure.getMessage, failure)
throw failure
}*/
pull(in)
}
})
}
}
private def handle(envelope: EventEnvelope2): Future[Unit] = {
val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope
...
db.run(statements.transactionally)
}
}
I have to go with blocking future at the moment, which does not look good. The non-blocking one I commented out only works for the first event. Could anyone please help?
Updated Thanks @ViktorKlang. It seems to be working now.
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
new GraphStageLogic(shape) {
val callback = getAsyncCallback[Try[Unit]] {
case Success(_) =>
//completeStage()
pull(in)
case Failure(error) =>
failStage(error)
}
// This requests one element at the Sink startup.
override def preStart(): Unit = {
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
future.onComplete { result =>
callback.invoke(result)
}
}
})
}
}
I am trying to implement a Rational DB event sink connnecting to ReadJournal.eventsByTag. So this is a continuous stream, which will never end unless there is an error - This is what I want. Is my approach correct?
Two more questions:
Will the GraphStage never end unless I manually invoke completeStage or failStage?
Am I right or normal to declare callback outside preStart method? and Am I right to invoke pull(in) in preStart in this case?
Thanks, Cheng
Upvotes: 0
Views: 681
Reputation: 17923
Avoid Custom Stages
In general, you should try to exhaust all possibilities with the given methods of the library's Source
, Flow
, and Sink
. Custom stages are almost never necessary and make your code difficult to maintain.
Writing Your "Custom" Stage Using Standard Methods
Based on the details of your question's example code I don't see any reason why you would use a custom Sink
to begin with.
Given your handle
method, you could slightly modify it to do the logging that you specified in the question:
val loggedHandle : (EventEnvelope2) => Future[Unit] =
handle(_) transform {
case Success(_) => {
logger.info("pulling next events")
Success(Unit)
}
case Failure(failure) => {
logger.error(failure.getMessage, failure)
Failure(failure)
}
}
Then just use Sink.foreachParallel
to handle the envelopes:
val createEventEnvelope2Sink : (Int) => Sink[EventEnvelope2, Future[Done]] =
(parallelism) =>
Sink[EventEnvelope2].foreachParallel(parallelism)(handle _)
Now, even if you want each EventEnvelope2
to be sent to the db in order you can just use 1
for parallelism:
val inOrderDBInsertSink : Sink[EventEnvelope2, Future[Done]] =
createEventEnvelope2Sink(1)
Also, if the database throws an exception you can still get a hold of it when the foreachParallel
completes:
val someEnvelopeSource : Source[EventEnvelope2, _] = ???
someEnvelopeSource
.to(createEventEnvelope2Sink(1))
.run()
.andThen {
case Failure(throwable) => { /*deal with db exception*/ }
case Success(_) => { /*all inserts succeeded*/ }
}
Upvotes: 1