Quack
Quack

Reputation: 740

Write files to GCS using Dataflow

Dataflow is being pre-processed by reading batch data.

The workload is read from Google Cloud Storage (GCS) to process Dataflow and upload it back to GCS.

But after processing the data, I checked the GCS.

result-001.csv

result-002.csv

result-003.csv

This is how the data is divided and stored. Can't I combine these files into one?

#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def preprocessing(fields):
    fields = fields.split(",")
    header = "label"
    for i in range(0, 784):
        header += (",pixel" + str(i))
    label_list_str = "["
    label_list = []
    for i in range(0,10) :
        if fields[0] == str(i) :
            label_list_str+=str(i)
        else :
            label_list_str+=("0")
        if i!=9 :
            label_list_str+=","
    label_list_str+="],"
    for i in range(1,len(fields)) :
        label_list_str+=fields[i]
        if i!=len(fields)-1:
            label_list_str+=","
    yield label_list_str


def run(project, bucket, dataset) :
        argv = [
            "--project={0}".format(project),
            "--job_name=kaggle-dataflow",
            "--save_main_session",
            "--region=asia-northeast1",
            "--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--max_num_workers=8",
            "--worker_region=asia-northeast3",
            "--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd",
            "--autoscaling_algorithm=THROUGHPUT_BASED",
            "--runner=DataflowRunner",
            "--worker_region=asia-northeast3"
        ]

        result_output = 'gs://kaggle-bucket-v1/result/result.csv'
        filename = "gs://{}/train.csv".format(bucket)
        pipeline = beam.Pipeline(argv=argv)
        ptransform = (pipeline
                      | "Read from GCS" >> beam.io.ReadFromText(filename)
                      | "Kaggle data pre-processing" >> beam.FlatMap(preprocessing)
                      )
   
  
    (ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output
        )
     )
    
    pipeline.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
    parser.add_argument("--project", dest="project", help="Unique project ID", required=True)
    parser.add_argument("--bucket", dest="bucket", help="Bucket where your data were ingested", required=True)
    parser.add_argument("--dataset", dest="dataset", help="BigQuery dataset")

    args = vars(parser.parse_args())

    print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))

    run(project=args["project"], bucket=args["bucket"], dataset=args["dataset"])

Thank you for reading :)

Upvotes: 0

Views: 2263

Answers (1)

Ricco D
Ricco D

Reputation: 7277

Method beam.io.WriteToText automatically splits files when writing for best performance. You can explicitly add parameter num_shards = 1 if you want 1 file only.

num_shards (int) – The number of files (shards) used for output. If not set, the service will decide on the optimal number of shards. Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.

Your writing to text should look like this:

(ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output,num_shards=1
        )
     )

Upvotes: 3

Related Questions