Venu
Venu

Reputation: 53

Apache beam write each tagged output to separate file

I tag input elements based on one of input data elements(date).

class TagElementsWithDate(beam.DoFn):
    def process(self, element):
        dt = element['date'].replace('-', '')[:6]
        yield pvalue.TaggedOutput(dt, element)

input_data = p | 'Read Input' >>  beam.io.Read(beam.io.BigQuerySource(query='select id, date from `project.dataset.tablename`', use_standard_sql=True))

tagged_data = input_data | 'tag data' >> beam.ParDo(TagElementsWithDate()).with_outputs()

tagged_data is DoOutputsTuple. I'm looking to iterate this and write each tagged data to a separate file.

Upvotes: 4

Views: 1924

Answers (1)

Étienne Tremblay
Étienne Tremblay

Reputation: 31

You need to write your own DoFn. Something like

from apache_beam.io.textio import _TextSink


class WriteEachKeyToText(beam.DoFn):
    def __init__(self, file_path_prefix=str):
        super().__init__()
        self.file_path_prefix = file_path_prefix

    def process(self, kv):
        key = kv[0]
        elements = kv[1]
        sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")

        writer = sink.open_writer("prefix", self.file_path_prefix)
        for e in elements:  # values
            writer.write(e)

Then you can use it like this:

output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))

Upvotes: 3

Related Questions