Reputation: 2165
Hello i am very confused By the dynamic file destinations api and there is no docs so here i am.
The situation is i have a PCollection and it contains events belonging to different partitions. I want to split them up and write them to different folders in gcs.
Here is what i have.
Dynamic destination object:
class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}
override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}
override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}
override def getDefaultDestination: String = "default"
}
I believe this is the correct logic, i ask each element what its destination partition is and then that get passed into the getFileNamePolicy and from there a file name is built. To format the record i just convert it to json.
The issue is integrating this with TextIO, i tried this
TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
but it requires that the source type be string, technically this could work but i would have to deserialise multiple times. I found in the docs for text io dynamic destinations
Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.
So lets try that
TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
This still doesn't compile as writeCustomType internally returns TypedWrite<UserT, Void>
and that has the knock on affect of requiring the 2nd type parameter of my dynamic destination object to be Void. Clearly i require it to be a string or at least something other than Void
Im clearly missing something
Upvotes: 5
Views: 2376
Reputation: 2923
Following @jkff advice on the Apache Beam Mailing List I managed to get it compiling and working this way:
val write = TextIO.writeCustomType[Event].asInstanceOf[TextIO.TypedWrite[Event, String]]
.to(new MyDynamicDestinations(baseDir))
Although after doing it that way I realised it is more convenient to use a DefaultFilenamePolicy.Params
instead of a String
as the destination output. Let me know if you want more information about that bit.
Upvotes: 1
Reputation: 17913
Oh man, this is embarrassing. Turns out writeCustomType().to(DynamicDestinations)
was not tested and we didn't notice it, but it had a typo in the type signature. PR https://github.com/apache/beam/pull/4319 is in review. You'll still need 2.3.0-SNAPSHOT to pick it up though, in which case I would still recommend to just use FileIO.write()
.
Upvotes: 3
Reputation: 2165
It doesn't seem to compile in scala but i was able to get the behaviour i wanted with a similar api after digging around
var outputTransform =
TextIO.
writeCustomType[T]()
.withFormatFunction(outputFormatter)
.withNumShards(shards)
.withTempDirectory(tempDir)
.withCompression(compression)
if (windowedWrites) {
outputTransform = outputTransform.withWindowedWrites()
}
outputTransform.to(outputFileNamePolicyMapping, emptryDestination)
where output formatter is from T to string and outputFileNamePolicyMapping is from T to DefaultFilenamePolicy.Params
Upvotes: 1