Reputation: 1843
we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams.
Following is my configuration...
val control : Consumer.DrainingControl[Done]
Consumer
.sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
.map(consumerRecord =>
val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
myActor ! Update(myAvro)
)
.via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
.toMat(Sink.ignore)(Consumer.DrainControl.apply)
.run()
This does what I expect as my Business Case, myActor receive the Commands Update(MyAvro)
I am more irritated with the technical concepts of Back Pressure, as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.
What I am also curious when Akka Kafka Stream commit the Kafka Topic offset? The moment the Command delivered to Mailbox of MyActor? If so then how I can handle scenarios like ask patterns, Kafka Offset should not commit until ask pattern completes.
I see some Factory Methods dealing with manual offset control 'plainPartitionedManualOffsetSource', 'commitablePartitionManualOffsetSource' but I can't find any example for those, can I decide with my business logic to manually commit the offsets?
As an alternative Configuration, I can use something like this.
val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("myTopic"))
.toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
.run()
Now I have an access to Sink.actorRef, I think Back Pressure mechanism has a chance control Back Pressure, Naturally this code will not work because I have no idea how can I access 'myAvro' under this constellation.
Thx for answers..
Upvotes: 1
Views: 559
Reputation: 1760
This statement is not correct:
... as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.
There's nothing special about Sink
s for backpressure. A backpressure as a flow control mechanism will be automatically used anywhere there's asynchronous boundary in the stream. That MAY be in Sink
but it may as well be anywhere else in the stream.
In your case you're hooking up your stream to talk to an actor. That's your asynchronous boundary, but the way you do it is using map
and inside that map u use !
to talk to an actor. So there is no backpressure, because:
map
is not an asynchronous operator and nothing called inside it can participate in backpressure mechanism. So from the Akka Stream perspective there is NO async boundary introduced.!
is fire and forget, there's no feedback provided as to how busy the actor is to enforce any backpressure.Like Levi mentioned, what you can do is to change from tell
to ask
interaction and make the receiving actor respond when its work is done. Then you can use mapAsync
like Levi describes. The difference here between map
and mapAsync
is that semantics of mapAsync
are such that it will emit downstream only when returned Future
completes. Even if parallelism
is 1 the backpressure still works. In case your Kafka records come way faster than your actor can handle, mapAsync
will backpressure upstream when it waits for Future
completion. In this particular case I think increasing . Depending on how your actor processes the messages, increasing parallelism
makes no sense as all those messages will be added to the actor's inbox, so you won't really speed anything up by doing this. If the interaction was say a REST call then it could improve the overall throughputparallelism
for mapAsync
may result in increased throughput. paralleslism
value effectively caps the max number of not completed Future
s allowed before backpressure kicks in.
Upvotes: 0
Reputation: 20551
In the first stream, there will be basically no backpressure. The offset commit will happen very soon after the message gets sent to myActor
.
For backpressure, you'll want to wait for a response from the target actor, and as you say, the ask pattern is the canonical way to accomplish that. Since an ask of an actor from outside an actor (which for all intents and purposes a stream is outside of an actor: that the stages are executed by actors is an implementation detail) results in a Future
, this suggests that mapAsync
is called for.
def askUpdate(m: MyAvro): Future[Response] = ??? // get actorref from cluster sharding, send ask, etc.
You would then replace the map
in your original stream with
.mapAsync(parallelism) { consumerRecord =>
askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}
mapAsync
limits the "in-flight" futures to parallelism
. If there are parallelism
futures (spawned by it, of course), it will backpressure. If a spawned future completes with a failure (for the ask itself, this will generally be a timeout), it will fail; the results (respecting incoming order) of the successful futures will be passed on (very often, these will be akka.Done
, especially when the only things left to do in the stream are offset commit and Sink.ignore
).
Upvotes: 1