Vinod
Vinod

Reputation: 4159

Guava Cache as ValueState in Flink

I am trying to de-duplicate events in my Flink pipeline. I am trying to do that using guava cache. My requirement is that, I want to de-duplicate over a 1 minute window. But at any given point I want to maintain not more than 10000 elements in the cache.

A small background on my experiment with Flink windowing:

Please let me know if I should not be seeing the above behavior with windowing.

Back to Guava:

I have Event which looks like this and a EventsWrapper for these events which looks like this. I will be getting a stream of EventsWrappers. I should remove duplicate Events across different EventsWrappers. Example if I have 2 EventsWrappers like below:

[EventsWrapper{id='ew1', org='org1', events=[Event{id='e1', name='event1'}, Event{id='e2', name='event2'}]}, EventsWrapper{id='ew2', org='org2', events=[Event{id='e1', name='event1'}, Event{id='e3', name='event3'}]}

I should emit as output the following:

[EventsWrapper{id='ew1', org='org1', events=[Event{id='e1', name='event1'}, Event{id='e2', name='event2'}]}, EventsWrapper{id='ew2', org='org2', events=[Event{id='e3', name='event3'}]}

i.e Making sure that e1 event is emitted only once assuming these two events are within the time and size requirements of the cache.

I created a RichFlatmap function where I initiate a guava cache and value state like this. And set the Guava cache in the value state like this. My overall pipeline looks like this.

But each time I try to update the guava cache inside the value state:

eventsState.value().put(eventId, true);

I get the following error:

java.lang.NullPointerException
    at com.google.common.cache.LocalCache.hash(LocalCache.java:1696)
    at com.google.common.cache.LocalCache.put(LocalCache.java:4180)
    at com.google.common.cache.LocalCache$LocalManualCache.put(LocalCache.java:4888)
    at events.piepline.DeduplicatingFlatmap.lambda$flatMap$0(DeduplicatingFlatmap.java:59)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:176)

On further digging, I found out that the error is because the keyEquivalence inside the Guava cache is null. I checked by directly setting on the Guava cache(not through state, but directly on the cache) and that works fine.

I felt this could be because, ValueState is not able to serialize GuavaCache. So I added a Serializer like this and registered it like this:

env.registerTypeWithKryoSerializer((Class<Cache<String,Boolean>>)(Class<?>)Cache.class, CacheSerializer.class); But this din't help either.

I have the following questions:

  1. Any idea what I might be doing wrong with the Guava cache in the above case.
  2. Is what I am seeing with my Tumbling and Slinding windows implementation is what is expected or am I doing something wrong?
  3. What will happen if I don't set the Guava Cache in ValueState, instead just use it as a plain object in the DeduplicatingFlatmap class and operate directly on the Guava Cache instead of operating through the ValueState? My understanding is, the Guava cache won't be part of the Checkpoint. So when the pipeline fails and restarts, the GuavaCahe would have lost all the values in it and it will be empty on restart. Is this understanding correct?

Thanks a lot in advance for the help.

Upvotes: 2

Views: 558

Answers (1)

David Anderson
David Anderson

Reputation: 43499

  1. See below.
  2. These windows are behaving as expected.
  3. Your understanding is correct.

Even if you do get it working, using a Guava cache as ValueState will perform very poorly, because RocksDB is going to deserialize the entire cache on every access, and re-serialize it on every update.

Moreover, it looks like you are trying to share a single cache instance across all of the orgs that happen to be multiplexed across a single flatmap instance. That's not going to work, because the RocksDB state backend will make a copy of the cache for each org (a side effect of the serialization involved).

Your requirements aren't entirely clear, but a deduplication query might help. But I'm thinking MapState in combination with timers in a KeyedProcessFunction is more likely to be the building block you need. Here's an example that might help you get started (but you'll be wanting to handle the timers differently).

Upvotes: 1

Related Questions