Reputation: 1025
I am working on an apache beam pipeline to run a SQL aggregation function.Reference: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159. The example here works fine.However, when I replace the source with an actual unbounded source and do an aggregation, I see no results. Steps in my pipeline:
A normal SQL statement executes and outputs the results. However, when I use a group by in the SQL, there is no output.
SELECT
o1.detectedCount,
o1.sensor se,
o2.sensor sa
FROM SENSOR o1
LEFT JOIN AREA o2
on o1.sensor = o2.sensor
The results are continous and like shown below.
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":0,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":1,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":0,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
The results don't show up at all when I change the sql to
SELECT
COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
LEFT JOIN AREA o2
on o1.sensor = o2.sensor
GROUP BY o2.sensor
Is there anything I am doing wrong in this implementation.Any pointers would be really helpful.
Upvotes: 2
Views: 556
Reputation: 1
SELECT
COUNT(o1.detectedCount) as number
,o2.sensor
,sa
FROM SENSOR o1
LEFT OUTER JOIN AREA o2
on o1.sensor = o2.sensor
GROUP BY sa,o1.sensor,o2.sensor
Upvotes: 0
Reputation: 737
Some suggestions come up when reading your code:
.apply("windowing", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(2)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))))
.withAllowedLateness(Duration.standardMinutes(10))
.discardingFiredPanes());
Try to remove the join
and check if without it you have output to the window,
Try to add more time to the window. because sometimes it is too short to shuffle the data between the workers. and the joined streams aren't emitted at the same time.
outputWithTimestamp
will output the rows in a different timestamp, and then they can be dropped when you don't allow lateness.
Read the docs for outputWithTimestamp
, this API is a bit risky.
If the input {@link PCollection} elements have timestamps, the output timestamp for each element must not be before the input element's timestamp minus the value of {@link getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will throw an {@link IllegalArgumentException} when executed. Use {@link withAllowedTimestampSkew(Duration)} to update the allowed skew.
CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted behind the watermark. These elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may be silently dropped.
Upvotes: 0