Reputation: 505
I have a pull based stream data source (just like Kafka). And I'd like to apply the reactor on this event processing application.
Currently, I created an infinite sequence of events using EmitterProcessor. And it is subscribed at initial time once and never canceled.
The following code indicates what I've done.
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
//.subscriberContext(...)
.subscribe()
}
For each event in the initial Flux<Event>
, I need to maintain/update a context so that I can get all inputs and results for each step and do some reporting in the final step.
Passing the immutable Context
class from step to step is an option but this will cause all step()
have an additional parameter. And not all step()
will use the Context
. In that case it seems ugly you just pass Context
and return Pair<Context,OtherResult>
. The Pair
is ugly as well.
So I prefer something like ThreadLocal<Context>
. Obviously in reactor the replacement is subscriberContext()
. However according my code, the initialize()
will be invoked once. The Flux<Event>
will be subscribe()
once. The subscriberContext
is not at my Event
level but subscription level. So there will be only a single context in my code. It not works.
The question is should I regard the event stream a Flux<Event>
or multiple Mono<Event>
and make subscription on each event? If Mono<Event>
is the best practice, then I can directly use subscriberContext()
. But is there any assemble time overhead (assemble on every event coming)?
In reactor-kafka, it makes each batch of Record
a Flux<Record>
, how can it implements something like record level context?
Thanks.
Upvotes: 1
Views: 2013
Reputation: 28351
Depending on how late you last need the information in this context, you might have the option of using a single flatMap
to create a scope for each event and assign them their own context:
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatMap(eventForScope ->
Mono.just(eventForScope)
.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
.subscriberContext(...) //context for ONE event
)
.subscribe()
}
This can be tuned, some late steps might not need the per-event context anymore so you can move these outside the outer flatMap
, etc...
This works because the inside of a flatMap
can see the "main" Context
, but changes to the inside context aren't visible to the outside / main sequence.
Upvotes: 1