Reputation: 1094
Is it possible to apply nested queries in Beam SQL? I tried to run such a query in Beam SQL but getting errors.
The query I am running is:
PCollection<BeamRecord> Query_Output = Query.apply(
BeamSql.queryMulti("Select Orders.OrderID From Orders Where Orders.CustomerID IN (Select Customers.CustomerID From Customers WHERE Customers.CustomerID = 2)"));
And the error that I'm getting is:
org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner validateAndConvert
INFO: SQL:
SELECT `Orders`.`OrderID`
FROM `Orders` AS `Orders`
WHERE `Orders`.`CustomerID` IN (SELECT `Customers`.`CustomerID`
FROM `Customers` AS `Customers`
WHERE `Customers`.`CustomerID` = 2)
Jan 19, 2018 11:56:36 AM org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(OrderID=[$0])
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
LogicalTableScan(table=[[Orders]])
LogicalAggregate(group=[{0}])
LogicalProject(CustomerID=[$0])
LogicalFilter(condition=[=($0, 2)])
LogicalTableScan(table=[[Customers]])
Exception in thread "main" java.lang.IllegalStateException: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: DefaultTrigger, Repeatedly.forever(AfterWatermark.pastEndOfWindow())
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:165)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:160)
at com.bitwise.cloud.ExampleOfJoins.main(ExampleOfJoins.java:91)
Caused by: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: DefaultTrigger, Repeatedly.forever(AfterWatermark.pastEndOfWindow())
at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:123)
at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:101)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:107)
at org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(Join.java:59)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.standardJoin(BeamJoinRel.java:217)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.buildBeamPipeline(BeamJoinRel.java:161)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel.buildBeamPipeline(BeamProjectRel.java:68)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:163)
... 5 more
Can somebody please help me with this.
Thank You.
Upvotes: 0
Views: 732
Reputation: 2539
Can you try setting Repeatedly.forever(AfterWatermark.pastEndOfWindow())
trigger on your input PCollection
? (see Beam Programming Guide)
What happens:
From the query plan you can see that it contains a join (LogicalJoin
) and an aggregate (LogicalAggregate
).
Current Beam SQL implementation requires that both inputs to LogicalJoin
have the same windowing strategy (including triggers).
Aggregate likely corresponds to IN
operator in your query. And for aggregates Beam SQL assumes some default windowing strategy and overrides whatever the input's configuration.
So the second part of the query ends up applying different windowing strategy from what your input already has, which then causes it to fail validation for join operation.
There is work at the moment to change Beam SQL behavior to not override the windowing configuration to avoid these issues.
Filed your issue as a bug: https://issues.apache.org/jira/browse/BEAM-3547
Upvotes: 1