Reputation: 4159
I am trying to de-duplicate events in my Flink pipeline. I am trying to do that using guava cache. My requirement is that, I want to de-duplicate over a 1 minute window. But at any given point I want to maintain not more than 10000 elements in the cache.
A small background on my experiment with Flink windowing:
Please let me know if I should not be seeing the above behavior with windowing.
Back to Guava:
I have Event
which looks like this and a EventsWrapper
for these events which looks like this. I will be getting a stream of EventsWrappers
. I should remove duplicate Events
across different EventsWrappers
.
Example if I have 2 EventsWrappers
like below:
[EventsWrapper{id='ew1', org='org1', events=[Event{id='e1', name='event1'}, Event{id='e2', name='event2'}]}, EventsWrapper{id='ew2', org='org2', events=[Event{id='e1', name='event1'}, Event{id='e3', name='event3'}]}
I should emit as output the following:
[EventsWrapper{id='ew1', org='org1', events=[Event{id='e1', name='event1'}, Event{id='e2', name='event2'}]}, EventsWrapper{id='ew2', org='org2', events=[Event{id='e3', name='event3'}]}
i.e Making sure that e1
event is emitted only once assuming these two events are within the time and size requirements of the cache.
I created a RichFlatmap
function where I initiate a guava cache and value state like this. And set the Guava cache in the value state like this. My overall pipeline looks like this.
But each time I try to update the guava cache inside the value state:
eventsState.value().put(eventId, true);
I get the following error:
java.lang.NullPointerException
at com.google.common.cache.LocalCache.hash(LocalCache.java:1696)
at com.google.common.cache.LocalCache.put(LocalCache.java:4180)
at com.google.common.cache.LocalCache$LocalManualCache.put(LocalCache.java:4888)
at events.piepline.DeduplicatingFlatmap.lambda$flatMap$0(DeduplicatingFlatmap.java:59)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:176)
On further digging, I found out that the error is because the keyEquivalence
inside the Guava cache is null.
I checked by directly setting on the Guava cache(not through state, but directly on the cache) and that works fine.
I felt this could be because, ValueState is not able to serialize GuavaCache. So I added a Serializer like this and registered it like this:
env.registerTypeWithKryoSerializer((Class<Cache<String,Boolean>>)(Class<?>)Cache.class, CacheSerializer.class);
But this din't help either.
I have the following questions:
Thanks a lot in advance for the help.
Upvotes: 2
Views: 558
Reputation: 43499
Even if you do get it working, using a Guava cache as ValueState will perform very poorly, because RocksDB is going to deserialize the entire cache on every access, and re-serialize it on every update.
Moreover, it looks like you are trying to share a single cache instance across all of the orgs that happen to be multiplexed across a single flatmap instance. That's not going to work, because the RocksDB state backend will make a copy of the cache for each org (a side effect of the serialization involved).
Your requirements aren't entirely clear, but a deduplication query might help. But I'm thinking MapState in combination with timers in a KeyedProcessFunction is more likely to be the building block you need. Here's an example that might help you get started (but you'll be wanting to handle the timers differently).
Upvotes: 1