Akshata
Akshata

Reputation: 1025

Apache beam: SQL aggregation outputs no results for Unbounded/Bounded join

I am working on an apache beam pipeline to run a SQL aggregation function.Reference: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159. The example here works fine.However, when I replace the source with an actual unbounded source and do an aggregation, I see no results. Steps in my pipeline:

  1. Read bounded data from a source and convert to collection of rows.
  2. Read unbounded json data from a websocket source.
  3. Assign timestamp to the every source stream via a DoFn.
  4. Convert the unbounded json to unbounded row collection
  5. Apply a window on the row collection
  6. Apply a SQL statement.
  7. Output the result of the sql.

A normal SQL statement executes and outputs the results. However, when I use a group by in the SQL, there is no output.

SELECT 
  o1.detectedCount,
  o1.sensor se,
  o2.sensor sa
FROM SENSOR o1 
  LEFT JOIN AREA o2 
  on o1.sensor = o2.sensor

The results are continous and like shown below.

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":1,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,            
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

The results don't show up at all when I change the sql to

SELECT
  COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
  LEFT JOIN AREA o2
  on o1.sensor = o2.sensor
GROUP BY o2.sensor

Is there anything I am doing wrong in this implementation.Any pointers would be really helpful.

Upvotes: 2

Views: 556

Answers (2)

Mike Sliwa
Mike Sliwa

Reputation: 1

SELECT
  COUNT(o1.detectedCount) as number
 ,o2.sensor
,sa
FROM SENSOR o1
  LEFT OUTER JOIN AREA o2
  on o1.sensor = o2.sensor
GROUP BY sa,o1.sensor,o2.sensor

Upvotes: 0

Brachi
Brachi

Reputation: 737

Some suggestions come up when reading your code:

  1. Extend the window, to allow lateness, and to emit early arrived data.
 .apply("windowing", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(2)))
                            .triggering(AfterWatermark.pastEndOfWindow()
                                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                            .plusDelayOf(Duration.standardSeconds(1)))
                                    .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                                            .plusDelayOf(Duration.standardSeconds(2))))
                            .withAllowedLateness(Duration.standardMinutes(10))
                            .discardingFiredPanes());

  1. Try to remove the join and check if without it you have output to the window,

  2. Try to add more time to the window. because sometimes it is too short to shuffle the data between the workers. and the joined streams aren't emitted at the same time.

  3. outputWithTimestamp will output the rows in a different timestamp, and then they can be dropped when you don't allow lateness. Read the docs for outputWithTimestamp, this API is a bit risky.

If the input {@link PCollection} elements have timestamps, the output timestamp for each element must not be before the input element's timestamp minus the value of {@link getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will throw an {@link IllegalArgumentException} when executed. Use {@link withAllowedTimestampSkew(Duration)} to update the allowed skew.

CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted behind the watermark. These elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may be silently dropped.

Upvotes: 0

Related Questions