David Anderson
David Anderson

Reputation: 43707

How can I use Flink to implement a streaming join between different data sources?

I have data coming from two different Kafka topics, served by different brokers, with each topic having different numbers of partitions. One stream has events about ads being served, the other has clicks:

ad_serves: ad_id, ip, sTime
ad_clicks: ad_id, ip, cTime

The documentation for process functions includes a section on implementing low-level joins with a CoProcessFunction or KeyedCoProcessFunction, but I'm not sure how to set that up.

I'm also wondering if one of Flink's SQL Joins could be used here. I'm interested both in simple joins like

SELECT s.ad_id, s.sTime, c.cTime
FROM ad_serves s, ad_clicks c
WHERE s.ad_id = c.ad_id

as well as analytical queries based on ads clicked on within 5 seconds of being served:

SELECT s.ad_id
FROM ad_serves s, ad_clicks c
WHERE
    s.ad_id = c.ad_id AND       
    s.ip = c.ip AND
    c.cTime BETWEEN s.sTime AND 
                    s.sTime + INTERVAL ‘5’ SECOND;

Upvotes: 4

Views: 2154

Answers (1)

David Anderson
David Anderson

Reputation: 43707

In general, I recommend using Flink SQL for implementing joins, as it is easy to work with and well optimized. But regardless of whether you use the SQL/Table API, or implement joins yourself using the DataStream API, the big picture will be roughly the same.

  • You will start with separate FlinkKafkaConsumer sources, one for each of the topics. If the numbers of partitions in these topics (and their data volumes) are very different, then you might decide to scale the number of instances of the Flink sources accordingly. In the diagram below I've suggested this by showing 2 ad_serve instances and 1 ad_click instance.

  • When implementing a join, whether with a KeyedCoProcessFunction or with the SQL/Table API, you must have an equality constraint on keys from both streams. In this case we can key both streams by the ad_id. This will have the effect of bringing together all events from both streams for a given key -- e.g., the diagram below shows ad_serve and ad_click events for ad 17, and how those events will both find their way to instance 1 of the KeyedCoProcessFunction.

enter image description here

  • The two queries given as examples have very different requirements in terms of how much state they will have to keep. For an unconstrained regular join such as

    SELECT s.ad_id, s.sTime, c.cTime
    FROM ad_serves s, ad_clicks c
    WHERE s.ad_id = c.ad_id
    

    the job executing this query will have to store (in Flink's managed, keyed state) all events from both streams, forever.

    On the other hand, the temporal constraint provided in the second query makes it possible to expire from state older serve and click events that can no longer participate in producing new join results. (Here I'm assuming that the streams involved are append-only streams, where the events are roughly in temporal order.)

These two queries also have different needs for keying. The first query is joined on c.ad_id = s.ad_id; the second one on s.ad_id = c.ad_id AND s.ip = c.ip. If you wanted to set this up for a KeyedCoProcessFunction the code would look something like this:

DataStream<Serve> serves = ...
DataStream<Click> clicks = ...

serves
  .connect(clicks)
  .keyBy(s -> new Tuple2<>(s.ad_id, s.ip),
         c -> new Tuple2<>(c.ad_id, c.ip))
  .process(new MyJoinFunction())

Note that keyBy on a connected stream needs two key selector functions, one for each stream, and these must map both streams onto the same keyspace. In the case of the second join, we're using tuples of (ad_id, ip) as the keys.

Upvotes: 7

Related Questions