Reputation: 51
I have a pipeline that recursively reads many JSON files from a Google Cloud Storage (GCS) bucket, then parses each file into a record. Each record then goes through a "Python Transform" plugin for further processing (adding new fields and values), and finally it should be saved in a different GCS bucket (the sink).
All my attempts at playing with the parameters of the GCS sink, and adding a "Wrangler" transform before it, and/or adding a "CSV Formatter" transform before the "Wrangler" transform, have not helped produce a CSV file. The preview output is always correct, but when deployed, the output is not.
The file produced in my chosen path is always a file name I did not choose and it is always a file type "application/octet-stream".
The first attempt (full pipeline)
This is the output, every time: Deployed pipeline output as octet-stream instead of CSV, and with file name I did not choose
How can I choose the file name, and what am I doing wrong with producing the output that it does not come out as a CSV in the GCS bucket?
Upvotes: 1
Views: 2298
Reputation: 51
At the time of writing this, after considering the comments and ideas proposed (@narendra, @Edwin, @Rally) I experimented with the different plugins and this is how I settled on a solution:
I used the Spark Sink plugin, and the FileDelete plugin, which can be placed after a sink.
The code for the Spark Sink is simple:
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fillerVar = "${fillerVar}"
val fullpath = "gs://somebucket/output/leader_board/"
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.option("encoding", "UTF-8")
.mode("append")
.save(fullpath)
}
The output includes not only a CSV file, but also an empty "_SUCCESS" file. This is deleted using the FileDelete plugin:
I recognized that I (right now) cannot find a simple way to change the output file name (whether one file or multiple files merged) through the plugins. And since I don't know Scala/Java well enough, I couldn't figure it out that way either.
For my purposes, I'm using Google Data Fusion to produce output to use with Google Data Studio. Data Studio can take as a data source not just individual files, but you can point it to a GCS bucket path and it will read all the files therein. Therefore it doesn't bother me anymore that I can't control the filename ("part-00000-[random]).
Upvotes: 1
Reputation: 399
Currently the GCS sink plugin do not support adding a name to the files written, since the files being written to the sink can be split into multiple parts. We can add a feature request to have a GCS action that you can run after the sink to concat the files into one and have specify names there.
Upvotes: 0
Reputation: 142
I replicated this as well and I also cannot choose the name and the type of the file I want. Since there is no content type option to choose from the sink, the file is outputted as a default file which is part-r-00000 with a file type of application/octet-stream.
If the Content-Type is not specified by the uploader and cannot be determined, it is set to application/octet-stream. here
I have created a feature request for this which you can also track the progress.
https://issuetracker.google.com/171366470
I agree with @narendra's suggested workaround to add the filenames via Spark Scala code.
Upvotes: 0