lafeier
lafeier

Reputation: 11

How to use Flink to implement aggregate operations on non-keyed data streams?

I want to implement the following case :

 1. select count(1) from user; 
 
 2. select count(distinct uid) from user;

However, aggregation operations cannot be used for non-keyed streams.

A non-keyed stream cannot use keyed state,I don't know what to do, can someone help me, thank you!

You can do this using the Table API, but I was wondering how to do this in the DataSteam API.

Upvotes: 0

Views: 72

Answers (1)

David Anderson
David Anderson

Reputation: 43707

Using the Table API is the recommended approach. You are likely to do more work and arrive at a less performant solution by using the DataStream API.

However, to answer your question:

In both cases, if you want to use the DataStream API and don't care about doing things in parallel, you could key the stream by a constant and then use keyed state.

To parallelize the count(1) case, you can key the stream by something in order to partition the stream. Then in a set of parallel counting tasks you can count mini-batches, and send batch count updates (as a changelog stream) downstream to a single instance of a summing-up/reporting task. Or you could build a solution using non-keyed state.

To parallelize the count(distinct user) case you could model your solution on the approach implemented in the Table API -- see the docs. In a nutshell, it works by transforming select count(distinct uid) from user into

SELECT SUM(cnt)
FROM (
    SELECT COUNT(DISTINCT uid) as cnt
    FROM T
    GROUP BY MOD(HASH_CODE(uid), 1024)
)

Upvotes: 1

Related Questions