Reputation: 840
How do I include Window.into or Window.triggering transform prior to GroupByKey in BEAM SQL?
I have following 2 tables:
Ist table
CREATE TABLE table1(
field1 varchar
,field2 varchar
)
2nd Table
CREATE TABLE table2(
field1 varchar
,field3 varchar
)
And I am writing the result in a 3rd Table
CREATE TABLE table3(
field1 varchar
,field3 varchar
)
First 2 tables are reading data from a kafka stream and I am doing a join on these tables and inserting the data into the third table, using the following query. The first 2 tables are un-bounded/non-bounded
INSERT INTO table3
(field1,
field3)
SELECT a.field1,
b.field3
FROM table1 a
JOIN table2 b
ON a.field1 = b.field1
I am getting the following error:
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173) at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204) at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:126) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472) at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:107) at org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(Join.java:59) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.standardJoin(BeamJoinRel.java:217) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.buildBeamPipeline(BeamJoinRel.java:161) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel.buildBeamPipeline(BeamProjectRel.java:68) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel.buildBeamPipeline(BeamAggregationRel.java:80) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel.buildBeamPipeline(BeamIOSinkRel.java:64) at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.compileBeamPipeline(BeamQueryPlanner.java:127) at com.dss.tss.v2.client.BeamSqlCli.compilePipeline(BeamSqlCli.java:95) at com.dss.test.v2.client.SQLCli.main(SQLCli.java:100)
Upvotes: 0
Views: 2589
Reputation: 2539
This is current implementation limitation of Beam SQL. You need to define windows and then join the inputs per-window.
Couple of examples of how to do joins and windowing in Beam SQL:
HOP
window and joins;Background
The problem is caused by the fact that it's hard to define Join operation for unbounded data streams in general, it is not limited to Beam SQL.
Imagine, for example, when data processing system receives inputs from 2 sources and then has to match records between them. From high level perspective, such system has to keep all the data it has seen so far, and then for each new record it has to go over all records in the second input source to see if there's a match there. It works fine when you have finite and small data sources. In simple case you could just load everything in memory, match the data from the sources, produce output.
With streaming data you cannot keep caching it forever. What if data never stops coming? And it is unclear when you want to emit the data. What if you have an outer join
operation, when do you decide that you don't have a matching record from another input?
For example see the explanation for the unbounded PCollections
in GroupByKey
section of the Beam guide. And Joins in Beam are usually implemented on top of it using CoGroupByKey
(Beam SQL Joins as well).
All of these questions can probably be answered for a specific pipeline, but it's hard to solve them in general case. Current approach in Beam SDK and Beam SQL is to delegate it to the user to solve for concrete business case. Beam allows users decide what data to aggregate together into a window, how long to wait for late data, and when to emit the results. There are also things like state cells and timers for more granular control. This allows a programmer writing a pipeline to explicitly define the behavior and work around these problems somewhat, with (a lot of) extra complexity.
Beam SQL is implemented on top of regular Beam SDK concepts and is bound by the same limitations. But it has more implementations of its own. For example, you don't have a SQL syntax to define triggers, state, or custom windows. Or you cannot write a custom ParDo
that could keep a state in an external service.
Upvotes: 5