Reputation: 53
I tag input elements based on one of input data elements(date).
class TagElementsWithDate(beam.DoFn):
def process(self, element):
dt = element['date'].replace('-', '')[:6]
yield pvalue.TaggedOutput(dt, element)
input_data = p | 'Read Input' >> beam.io.Read(beam.io.BigQuerySource(query='select id, date from `project.dataset.tablename`', use_standard_sql=True))
tagged_data = input_data | 'tag data' >> beam.ParDo(TagElementsWithDate()).with_outputs()
tagged_data is DoOutputsTuple. I'm looking to iterate this and write each tagged data to a separate file.
Upvotes: 4
Views: 1924
Reputation: 31
You need to write your own DoFn. Something like
from apache_beam.io.textio import _TextSink
class WriteEachKeyToText(beam.DoFn):
def __init__(self, file_path_prefix=str):
super().__init__()
self.file_path_prefix = file_path_prefix
def process(self, kv):
key = kv[0]
elements = kv[1]
sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")
writer = sink.open_writer("prefix", self.file_path_prefix)
for e in elements: # values
writer.write(e)
Then you can use it like this:
output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))
Upvotes: 3