Reputation: 1
// Define the incoming raw data stream sourcing from kafka topic
DataStream<RawMessage> mainStream = env.addSource(...);
// Define the reference data stream sourcing from kafka topic
DataStream<ReferenceData> referenceDataStream = env.addSource(...);
// Define the broadcast state descriptor
MapStateDescriptor<String, ReferenceData> broadcastStateDescriptor = new MapStateDescriptor<>(
"BroadcastReferenceState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(ReferenceData.class)
);
// Broadcast the reference data stream
BroadcastStream<ReferenceData> broadcastStream = referenceDataStream.broadcast(broadcastStateDescriptor);
// Connect the raw data stream with the broadcast stream
mainStream
.keyBy(raw -> raw.getKey()) // Key by the same key used for reference
.connect(broadcastStream)
.process(new KeyedBroadcastProcessFunction<String, mainMessage, ReferenceData, EnrichedMainMessage>() {
// Keyed state to store the reference data
private transient MapState<String, ReferenceData> keyedReferenceState;
private ListState<ObjectNode> bufferedMessages;
@Override
public void open(Configuration parameters) {
// Initialize the keyed state
MapStateDescriptor<String, ReferenceData> keyedStateDescriptor = new MapStateDescriptor<>(
"KeyedReferenceState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(ReferenceData.class)
);
keyedReferenceState = getRuntimeContext().getMapState(keyedStateDescriptor);
ListStateDescriptor<ObjectNode> bufferedMessagesDescriptor = new ListStateDescriptor<>("bufferedMessages", String.class);
bufferedMessages = getRuntimeContext().getListState(bufferedMessagesDescriptor);
}
@Override
public void processElement(MainMessage mainMessage, ReadOnlyContext ctx, Collector<EnrichedMainMessage> out) throws Exception {
// Lookup the reference data in the keyed state
ReferenceData referenceData = keyedReferenceState.get(mainMessage.getKey());
if (referenceData != null) {
// Enrich the main message with the reference data
EnrichedMainMessage enrichedMainMessage = new EnrichedMainMessage(mainMessage, referenceData);
// Emit the enriched message
out.collect(enrichedMainMessage);
} else {
// Handle the case where reference data is not available
// by buffering the main message to the listState
// re-process the messages in the bufferedMessages list
// if referenceData is not null
**bufferedMessages.add(cleanMessage);**
}
}
@Override
public void processBroadcastElement(ReferenceData referenceData, Context ctx, Collector<EnrichedMainMessage> out) throws Exception {
// Update the keyed state with the latest reference data
keyedReferenceState.put(referenceData.getKey(), referenceData);
}
});
The above is a pseudocode for how we are currently processing our main stream data with the reference data that we get every one hour. We source the reference data from five different kafka topics and then we broadcast them using a BroadcastStream API. We are doing this to replicate the globalKTable functionality in kafka streams. We process data using the KeyedBroadcastProcessFunction where we use a MapState to put the key and its corresponding reference data in the processBroadcastElement and then we process every element in the main stream in the processElement by passing the key from main stream to the Mapstate and retrieving the corresponding reference data which is then used to enrich/join with the main stream. We are currently facing two issues with this approach
java.lang.OutOfMemoryError: Java heap space (This happens even after setting the TTL to clear the buffer to 2 hours)
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id 172.XX.XX.XX:6XXX-8XXXX (ip) is no longer reachable.
Can you advise if broadcasting the reference data using BroadcastStream api is the right approach for this use case? Also, is there a Kafka streams GlobalKTable equivalent in flink?
We tried to buffer the main stream using listState to allow the reference data in the broadcast stream Map state to catch up (expectation). However, the join is failing due to the reference data not being caught up and throws additional exceptions on startup after a couple of minutes as mentioned above. Thereby, Application crashes and is unstable.
Upvotes: 0
Views: 46
Reputation: 43687
Since the reference data being broadcast is being accessed using the same key used to key partition the main stream (via keyBy), I don't see any compelling reason to use broadcast.
For an alternative implementation based on a KeyedCoProcessFunction, see https://github.com/confluentinc/flink-cookbook/tree/master/enrichment-join-with-buffering.
Upvotes: 0