Reputation: 11
I want to implement the following case :
1. select count(1) from user;
2. select count(distinct uid) from user;
However, aggregation operations cannot be used for non-keyed streams.
A non-keyed stream cannot use keyed state,I don't know what to do, can someone help me, thank you!
You can do this using the Table API, but I was wondering how to do this in the DataSteam API.
Upvotes: 0
Views: 72
Reputation: 43707
Using the Table API is the recommended approach. You are likely to do more work and arrive at a less performant solution by using the DataStream API.
However, to answer your question:
In both cases, if you want to use the DataStream API and don't care about doing things in parallel, you could key the stream by a constant and then use keyed state.
To parallelize the count(1)
case, you can key the stream by something in order to partition the stream. Then in a set of parallel counting tasks you can count mini-batches, and send batch count updates (as a changelog stream) downstream to a single instance of a summing-up/reporting task. Or you could build a solution using non-keyed state.
To parallelize the count(distinct user)
case you could model your solution on the approach implemented in the Table API -- see the docs. In a nutshell, it works by transforming select count(distinct uid) from user
into
SELECT SUM(cnt)
FROM (
SELECT COUNT(DISTINCT uid) as cnt
FROM T
GROUP BY MOD(HASH_CODE(uid), 1024)
)
Upvotes: 1