Reputation: 7047
With a Flux how do you access previous element?
I have an external event stream, that gives events in order, the order of that stream is to dispatch one event and immediately afterwards to dispatch another event. However the metadata for the second event is in the first event.
Note that it isn't always an even number of events.
What I'm trying to do is combine the events into a stream of events for consumption downstream.
Flux#zip
looked promising but it would mean returning an object of the external event type.
So far what I've got is.
BinaryLogClient client = new BinaryLogClient(host, port, username, password);
Flux<Event> bridge = Flux.create(sink -> {
EventListener fluxListener = event -> {
sink.next(event);
};
client.registerEventListener(fluxListener);
});
bridge.subscribe(DemoApplication::printEvent);
bridge.map(new EventPairMemorizer());
public class EventPair {
private final Event previous;
private final Event current;
public EventPair(Event previous, Event current) {
this.previous = previous;
this.current = current;
}
/**
* @return `null` if no previous events.
*/
public Event getPrevious() {
return previous;
}
public Event getCurrent() {
return current;
}
}
/**
* Not thread safe has to go on a single thread
*/
public class EventPairMemorizer implements Function<Event, EventPair> {
Event previous = null;
EventPair toPair(Event e) {
EventPair pair = new EventPair(previous, e);
previous = e;
return pair;
}
@Override
public EventPair apply(Event current) {
return toPair(current);
}
}
This is partly a learning exercise, partially a proof of concept.
I'm attempting to use mysql-binlog-connector-java to get a stream on whats changed in the database.
So If I receive a EXT_WRITE_ROWS
event the previous event is a TABLE_MAP
event. I then want to do a column lookup on the TABLE_MAP
event (using jdbc). Then convert to some internal structure that is JSON friendly.
The same applies for a EXT_UPDATE_ROWS
event.
So the idea code looks like
Upvotes: 0
Views: 2197
Reputation: 9189
You can use .scan()
Flux<EventPair> pairs = bridge.scan(new EventPair(null,null),(prevPair,newEvent)->
new EventPair(prevPair.current,newEvent)
);
Upvotes: 4
Reputation: 28301
How about overlapping buffers?
With buffer(2, 1)
you would open a buffer for every element, and each buffer would contain 2 elements.
You can then ignore the buffers that don't end with an event you're interested in, and get the previous value for the events you ARE interested in...
Upvotes: 6