Bishamon Ten
Bishamon Ten

Reputation: 588

How to join a stream and dataset?

How to join a stream and dataset? I have a stream and I have a static data in a file. I want to enrich the data of stream using the data in the file.

Example: in stream I get airports code and in file I have the name of the airports and codes in file. Now I want to join the stream data to the file to form a new stream with airport names. Please provide steps on how to achieve this.

Upvotes: 2

Views: 2150

Answers (1)

David Anderson
David Anderson

Reputation: 43409

There are lots of ways to approach stream enrichment with Flink, depending on the exact requirements. https://www.youtube.com/watch?v=cJS18iKLUIY is a good talk by Konstantin Knauf that covers many different approaches, and the tradeoffs between them.

In the simple case where the enrichment data is immutable and reasonably small, I would just use a RichFlatMap and load the whole file in the open() method. That would look something like this:

public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
      super.open(parameters);
      referenceData = loadReferenceData();
    }

    @Override
    public void flatMap(
        final Event event,
        final Collector<EnrichedEvent> collector) throws Exception {

      SensorReferenceData sensorReferenceData = 
        referenceData.get(sensorMeasurement.getSensorId());
      collector.collect(new EnrichedEvent(event, sensorReferenceData));
    }

}

You'll find more code examples for other approaches in https://github.com/knaufk/enrichments-with-flink.

UPDATE:

If what you'd rather do is preload some larger, partitioned reference data to join with a stream, there are a few ways to approach this, some of which are covered in the video and repo I shared above. For those specific requirements, I suggest using a custom partitioner; there's an example here in that same github repo. The idea is that the enrichment data is sharded, and each streaming event is steered toward the instance with the relevant reference data.

In my opinion, this is simpler than trying to get the Table API to do this particular enrichment as a join.

Upvotes: 2

Related Questions