Reputation: 602
I am trying to copy specific files from Bucket A to Bucket B. Bucket A is structured (directories), whereas Bucket B will have no directories. The challenge is that I need to name my files based on their original filename. Normally, I would create a custom filename policy and modify it as necessary. However, the only way I know to access the original filename is by passing through each element and pulling its metadata. How can I gain access to each element within TextIO.write?
I've considered creating a transform before TextIO.write that takes in a pcollection of elements and outputs a pcollection of KV where the key is the original filename and the value is the element (similar to this example). However, if I do that, how does my writer know how to write a KV?
I was able to get a hackey way of this working by using writedynamic and partitioning by each element's filename in a serializablefunction. Then I could pass through partitiontype to my filename policy and in turn, achieve my desired result. That being said, this seems far from efficient and wasn't designed for this since I don't actually need to partition anything.
Upvotes: 1
Views: 1694
Reputation: 7058
When using writeDynamic
the by
method specifies the criterium used to partition the incoming data to its corresponding destination. For example, if this is decided upon the key of the KV pair we can use .by(KV::getKey)
and the destination file name can be tuned thanks to .withNaming
.
In addition, with the via
method we can provide a function to be applied to each partition as explained here. In this case we want to use the keys to select the destination but we do not want to write them in the output files. Therefore, we can write the value and omit the key with .via(Contextful.fn(KV::getValue), TextIO.sink())
.
Whereas by
accepts a SerializableFunction
as parameter, the via
method requires the use of Contextful<Contextful.Fn<UserT,OutputT>> outputFn
. That's why I wrap KV::getValue
in a Contextful.fn()
. In some examples like this template it can be useful to provide context such as a required side input but here I just want to pass the function.
Code snippet (more details here)
p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
Upvotes: 3
Reputation: 1725
Here are a few approaches you may wish to consider, depending on if you are trying to one off copy or create some sort of way of doing this system:
If you are just trying to copy files around. Then you may not need dataflow at all. You can use gsutil to copy the files.
If you just need to copy files without modification and still want to use dataflow you could use gsutil in dataflow yourself.
If you need to transform each file. You may want to make transforms which operate on a whole file, reading it in entirely and modifying it entirely, and writing it out in a custom ParDo. Example
Alternatively to using Dataflow. You can use google cloud functions to trigger whenever a GCS file is created.
Note: TextIO and FileIO are record based transforms, not file based transforms. They pull a file appart into records, to achieve parallelism. The original filenames and order of records are not really maintained. I see you have tried to maintain the filename with a KV, but as you mentioned FileIO does not allow you to pass in a filename with each record.
Upvotes: 1