Reputation: 11
I've seen some similar questions around this issue which suggest low throughput from PubSub can cause issues; however I have more than enough data coming through to push things along...
This is a Python streaming pipeline, reading data from PubSub with the ultimate goal of writing records to Redis (Memorystore) to use as a cache.
with beam.Pipeline(options=pipeline_options) as p:
windowed_history_events = (p
| "Read input from PubSub" >> beam.io.ReadFromPubSub(subscription=known_args.subscription)
| "Parse string message to dict" >> beam.ParDo(ParseMessageToDict())
| "Filter non-page views" >> beam.Filter(is_page_view)
| "Create timestamp KV" >> beam.ParDo(CreateTimestampKV())
| "Window for writes" >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(10)),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| "Get user and content ID" >> beam.ParDo(ParseMessageToKV())
| "Group by user ID" >> beam.GroupByKey()
| "Create timestamp KV2" >> beam.ParDo(TmpDOFN())
| "Push content history to Memorystore" >> beam.ParDo(
ConnectToRedis(known_args.host, known_args.port))
)
The TmpDoFN() function after the GroupByKey step is just there as a debug step right now - it just prints out messages to make sure something is going through it:
class TmpDOFN(beam.DoFn):
def process(self, message):
print(message)
yield message
However, this never gets called and nothing is printed (and PyCharm's debug point is never triggered). As I understand it, the window function/trigger I have set up at the moment should just output every 10 messages which are then Grouped and passed to the next step.
If I remove the GroupByKey step, messages are printed out as expected and the pipeline continues..
I tried this using FixedWindow previously and ran into the same problem.
Any ideas?
Thanks
Upvotes: 1
Views: 1503
Reputation: 11
I experienced a similar issue with GroupByKey giving an output of no results and also Latest.PerKey giving the a runtime error when I was windowing the data and then trying to aggregate the output.
The debug approach was just to do the GroupByKey on Global Window before I started window... which worked just fine for GroupByKey. This led me to a windowing problem with GroupByKey as it worked just find in Global Window, but not my Fixed or Sliding Windows.
Issue: Rookie Error on my part... with event time beyond late arrival
Workaround: Temporarily using processing time (could have allowed for later arrival)
Upvotes: 1