Reputation: 41
I am using a slightly adjusted version of the following example snippet to write PubSub messages to GCS:
class WriteToGCS(beam.DoFn):
def __init__(self, output_path, prefix):
self.output_path = output_path
self.prefix = prefix
def process(self, key_value, window=beam.DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
start_date = window.start.to_utc_datetime().date().isoformat()
start = window.start.to_utc_datetime().isoformat()
end = window.end.to_utc_datetime().isoformat()
shard_id, batch = key_value
filename = f'{self.output_path}/{start_date}/{start}-{end}/{self.prefix}-{start}-{end}-{shard_id:03d}'
with beam.io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body in batch:
f.write(f"{message_body},".encode("utf-8"))
However, its terribly slow. This is how it looks like in the graph. Is there a way to speed up this step? The subscription gets 500 elements per second, so 3-10 elements per seconds is not keeping up.
The pipeline looks like this:
class JsonWriter(beam.PTransform):
def __init__(self, window_size, path, prefix, num_shards=20):
self.window_size = int(window_size)
self.path = path
self.prefix = prefix
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
| "Group into fixed windows" >> beam.WindowInto(window.FixedWindows(self.window_size, 0))
| "Decode windowed elements" >> beam.ParDo(Decode())
| "Group into batches" >> beam.BatchElements()
| "Add key" >> beam.WithKeys(lambda _: random.randint(0, int(self.num_shards) - 1))
| "Write to GCS" >> beam.ParDo(WriteToGCS(self.path, self.prefix))
)
Upvotes: 1
Views: 334
Reputation: 41
Solved this with these steps:
| "Add Path" >> beam.ParDo(AddFilename(self.path, self.prefix, self.num_shards))
| "Group Paths" >> beam.GroupByKey()
| "Join and encode" >> beam.ParDo(ToOneString())
| "Write to GCS" >> beam.ParDo(WriteToGCS())
First I add the paths per element, then I group by path (to prevent simultaneous writes to the same file/shard). Then I concatenate (new line delimited) the JSON strings per group (path) and then write the whole string to the file. This reduces the calls to GCS tremendously, since I dont write every element one by one to a file, but instead join them first and then write once. Hope it helps someone.
Upvotes: 1