Wes
Wes

Reputation: 7047

Combining Flux entry with previous and map

Question

With a Flux how do you access previous element?

Background

I have an external event stream, that gives events in order, the order of that stream is to dispatch one event and immediately afterwards to dispatch another event. However the metadata for the second event is in the first event.

Note that it isn't always an even number of events.

What I'm trying to do is combine the events into a stream of events for consumption downstream.

Flux#zip looked promising but it would mean returning an object of the external event type.

Inital code

So far what I've got is.

    BinaryLogClient client = new BinaryLogClient(host, port, username, password);
    Flux<Event> bridge = Flux.create(sink -> {
        EventListener fluxListener = event -> {
            sink.next(event);
        };

        client.registerEventListener(fluxListener);
    });

    bridge.subscribe(DemoApplication::printEvent);
    bridge.map(new EventPairMemorizer());


public class EventPair  {
    private final Event previous;
    private final Event current;

    public EventPair(Event previous, Event current) {
        this.previous = previous;
        this.current = current;
    }

    /**
     * @return `null` if no previous events.
     */
    public Event getPrevious() {
        return previous;
    }

    public Event getCurrent() {
        return current;
    }
}

/**
 * Not thread safe has to go on a single thread
 */
public class EventPairMemorizer implements Function<Event, EventPair> {
    Event previous = null;

    EventPair toPair(Event e) {
        EventPair pair = new EventPair(previous, e);
        previous = e;
        return pair;
    }

    @Override
    public EventPair apply(Event current) {
        return toPair(current);
    }
}

This is partly a learning exercise, partially a proof of concept.

Irrelevant Details

I'm attempting to use mysql-binlog-connector-java to get a stream on whats changed in the database.

So If I receive a EXT_WRITE_ROWS event the previous event is a TABLE_MAP event. I then want to do a column lookup on the TABLE_MAP event (using jdbc). Then convert to some internal structure that is JSON friendly.

The same applies for a EXT_UPDATE_ROWS event.

So the idea code looks like

  1. onExternalEvent push to Flux
  2. check event type. If matching call jdbc on jdbc thread using Mono
  3. combine Mono and current event.
  4. map to internal type.
  5. emit to a different stream.
  6. profit

Upvotes: 0

Views: 2197

Answers (2)

raisercostin
raisercostin

Reputation: 9189

You can use .scan()

Flux<EventPair> pairs = bridge.scan(new EventPair(null,null),(prevPair,newEvent)->
  new EventPair(prevPair.current,newEvent)
);

Upvotes: 4

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

How about overlapping buffers?

With buffer(2, 1) you would open a buffer for every element, and each buffer would contain 2 elements.

You can then ignore the buffers that don't end with an event you're interested in, and get the previous value for the events you ARE interested in...

Upvotes: 6

Related Questions