Reputation: 740
Dataflow is being pre-processed by reading batch data.
The workload is read from Google Cloud Storage (GCS) to process Dataflow and upload it back to GCS.
But after processing the data, I checked the GCS.
result-001.csv
result-002.csv
result-003.csv
This is how the data is divided and stored. Can't I combine these files into one?
#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def preprocessing(fields):
fields = fields.split(",")
header = "label"
for i in range(0, 784):
header += (",pixel" + str(i))
label_list_str = "["
label_list = []
for i in range(0,10) :
if fields[0] == str(i) :
label_list_str+=str(i)
else :
label_list_str+=("0")
if i!=9 :
label_list_str+=","
label_list_str+="],"
for i in range(1,len(fields)) :
label_list_str+=fields[i]
if i!=len(fields)-1:
label_list_str+=","
yield label_list_str
def run(project, bucket, dataset) :
argv = [
"--project={0}".format(project),
"--job_name=kaggle-dataflow",
"--save_main_session",
"--region=asia-northeast1",
"--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
"--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
"--max_num_workers=8",
"--worker_region=asia-northeast3",
"--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd",
"--autoscaling_algorithm=THROUGHPUT_BASED",
"--runner=DataflowRunner",
"--worker_region=asia-northeast3"
]
result_output = 'gs://kaggle-bucket-v1/result/result.csv'
filename = "gs://{}/train.csv".format(bucket)
pipeline = beam.Pipeline(argv=argv)
ptransform = (pipeline
| "Read from GCS" >> beam.io.ReadFromText(filename)
| "Kaggle data pre-processing" >> beam.FlatMap(preprocessing)
)
(ptransform
| "events:out" >> beam.io.WriteToText(
result_output
)
)
pipeline.run()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
parser.add_argument("--project", dest="project", help="Unique project ID", required=True)
parser.add_argument("--bucket", dest="bucket", help="Bucket where your data were ingested", required=True)
parser.add_argument("--dataset", dest="dataset", help="BigQuery dataset")
args = vars(parser.parse_args())
print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))
run(project=args["project"], bucket=args["bucket"], dataset=args["dataset"])
Thank you for reading :)
Upvotes: 0
Views: 2263
Reputation: 7277
Method beam.io.WriteToText automatically splits files when writing for best performance. You can explicitly add parameter num_shards = 1
if you want 1 file only.
num_shards (int) – The number of files (shards) used for output. If not set, the service will decide on the optimal number of shards. Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.
Your writing to text should look like this:
(ptransform
| "events:out" >> beam.io.WriteToText(
result_output,num_shards=1
)
)
Upvotes: 3