O Bishop
O Bishop

Reputation: 11

Apache Beam GroupByKey Produces No Output

I've seen some similar questions around this issue which suggest low throughput from PubSub can cause issues; however I have more than enough data coming through to push things along...

This is a Python streaming pipeline, reading data from PubSub with the ultimate goal of writing records to Redis (Memorystore) to use as a cache.

 with beam.Pipeline(options=pipeline_options) as p:
        windowed_history_events = (p
         | "Read input from PubSub" >> beam.io.ReadFromPubSub(subscription=known_args.subscription)
         | "Parse string message to dict" >> beam.ParDo(ParseMessageToDict())
         | "Filter non-page views" >> beam.Filter(is_page_view)
         | "Create timestamp KV" >> beam.ParDo(CreateTimestampKV())
         | "Window for writes" >> beam.WindowInto(
                    window.GlobalWindows(),
                    trigger=trigger.Repeatedly(trigger.AfterCount(10)),
                    accumulation_mode=trigger.AccumulationMode.DISCARDING
                )
         | "Get user and content ID" >> beam.ParDo(ParseMessageToKV())
         | "Group by user ID" >> beam.GroupByKey()
         | "Create timestamp KV2" >> beam.ParDo(TmpDOFN())
         | "Push content history to Memorystore" >> beam.ParDo(
                    ConnectToRedis(known_args.host, known_args.port))
                                   )

The TmpDoFN() function after the GroupByKey step is just there as a debug step right now - it just prints out messages to make sure something is going through it:

class TmpDOFN(beam.DoFn):
    def process(self, message):
        print(message)
        yield message

However, this never gets called and nothing is printed (and PyCharm's debug point is never triggered). As I understand it, the window function/trigger I have set up at the moment should just output every 10 messages which are then Grouped and passed to the next step.

If I remove the GroupByKey step, messages are printed out as expected and the pipeline continues..

I tried this using FixedWindow previously and ran into the same problem.

Any ideas?

Thanks

Upvotes: 1

Views: 1503

Answers (1)

Luke Andrews
Luke Andrews

Reputation: 11

I experienced a similar issue with GroupByKey giving an output of no results and also Latest.PerKey giving the a runtime error when I was windowing the data and then trying to aggregate the output.

The debug approach was just to do the GroupByKey on Global Window before I started window... which worked just fine for GroupByKey. This led me to a windowing problem with GroupByKey as it worked just find in Global Window, but not my Fixed or Sliding Windows.

  • Issue: Rookie Error on my part... with event time beyond late arrival

    • I was using event time with historical data to build my pipeline that was several weeks old. My window discarded the rest because the event time was so old beyond watermark / allowed lateness, hence no results / error.
  • Workaround: Temporarily using processing time (could have allowed for later arrival)

    • For purpose of building my pipeline on historical data, I switched to processing time for event timestamps and then aggregated just fine. In real-world, I don't expect this as a sunny-day scenario but a good lesson learnt to cater for very very late arrival records.
    • I could have also just allowed for this longer late arrival, although for a streaming pipeline to have a few weeks delay, I'd rather deal with the error in python as a try/except scenario.

Upvotes: 1

Related Questions