Reputation: 31
I'm working on a Python Apache Beam job with session windowing on a bounded dataset. It works for small datasets, but the job dies when I increase the size of input data.
The job ID is 2019-06-10_07_28_32-2942508228086251217
.
elements = (p | 'IngestData' >> beam.io.Read(big_query_source))
elements | 'AddEventTimestamp' >> beam.ParDo(AddTimestampDoFn()) \
| 'SessionWindow' >> beam.WindowInto(window.Sessions(10 * 60)) \
| 'CreateTuple' >> beam.Map(lambda row: (row['id'], {'attribute1': row['attribute1'], 'date': row['date']})) \
| 'GroupById1' >> beam.GroupByKey() \
| 'AggregateSessions' >> beam.ParDo(AggregateTransactions()) \
| 'MergeWindows' >> beam.WindowInto(window.GlobalWindows()) \
| 'GroupById2' >> beam.GroupByKey() \
| 'MapSessionsToLists' >> beam.Map(lambda x: (x[0], [y for y in x[1]])) \
| 'BiggestSession' >> beam.ParDo(MaximumSession()) \
| "PrepForWrite" >> beam.Map(lambda x: x[1].update({"id": x[0]}) or x[1]) \
| 'WriteResult' >> WriteToText(known_args.output)
With the DoFn classes being
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
date = datetime.datetime.strptime(element['date'][:-4], '%Y-%m-%d %H:%M:%S.%f')
unix_timestamp = float(date.strftime('%s'))
yield beam.window.TimestampedValue(element, unix_timestamp)
class AggregateTransactions(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
session_count = len(element[1])
attributes = list(map(lambda row: row['attribute1'], element[1]))
std = np.std(amounts)
return [(element[0], {'session_count': session_count, 'session_std': std, 'window_start': window.start
.to_utc_datetime()
.strftime('%d-%b-%Y %H:%M:%S')})]
class MaximumSession(beam.DoFn):
def process(self, element):
sorted_counts = sorted(element[1], key = lambda x: x['session_count'], reverse=True)
return [(element[0], {'session_count': sorted_counts[0]['session_count'],
'session_std': sorted_counts[0]['session_std'],
'window_start_time': sorted_counts[0]['window_start']})]
The job fails and gives me this error: The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers:
The particular worker logs on stackdriver aren't suggestive of anything. I'm just getting a combination of these entries:
processing lull for over 431.44 seconds in state process-msecs in step s5
Refusing to split <dataflow_worker.shuffle.GroupedShuffleRangeTracker object at 0x7f82e970cbd0> at '\n\xaaG\t\x00\x01': proposed split position is out of range
Retry with exponential backoff: waiting for 4.69305060273 seconds before retrying lease_work because we caught exception: SSLError: ('The read operation timed out',)
The rest of the entries are informational.
The latest memory usage was 43413 MB for that particular worker. Since I'm using n1-highmem-32
machines, I don't think that memory could be an issue here.
On client side, Cloud Shell, where I'm triggering this job, I just got a lot of
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
before the job crashed.
Any ideas?
Thanks
Upvotes: 3
Views: 529
Reputation: 58
By default, Dataflow retries the pipeline 4 times if any error comes up in a BATCH mode and indefinite times when running in STREAM mode.
Please create dashboards in the stack driver for compute engine machines used for the pipeline to analyze how much memory, CPU consumption, and IO operations are happening. Raising the config of the pipeline should happen after careful analysis of the above factors.
Please make sure all the transformations are working fine based on the data you are providing and apply exception handling as well.
Upvotes: 1