Reputation: 159
My pipeline stores the output data ,file, to GCS. I would like to compress this file. TextIO has uncompress the file which is compressed, but I guess it does not have compress the file. How can I compress output file?
Upvotes: 1
Views: 1111
Reputation: 126
As Thang mentioned this is now possible in the beam sdk version 2 by adding .withCompression(Compression.GZIP)
:
// Without Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"));
// With Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"))
.withSuffix(".txt")
.withCompression(Compression.GZIP));
The full example given can be found in the docs
Upvotes: 0
Reputation: 137
This is currently an open feature request for DataFlow, however the work has already been done in Beam. Once Dataflow 2.0 is released (which will be based on Beam) this should be officially supported.
That said, I have been able write compressed GZIP files by extending the FileBasedSink class and utilizing Jeff Payne's work on this feature in Beam.
public class GZIPSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
GZIPSink(String baseOutputFilename, Coder<T> coder) {
super(baseOutputFilename, ".gz");
this.coder = coder;
}
@Override
public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
return new GZIPWriteOperation(this, coder);
}
static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
private final Coder<T> coder;
private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
super(sink);
this.coder = coder;
}
@Override
public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
return new GZIPBasedWriter(this, coder);
}
}
static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
private GZIPOutputStream out;
public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
super(writeOperation);
this.mimeType = MimeTypes.BINARY;
this.coder = coder;
}
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
def.setLevel(def.BEST_COMPRESSION);
}};
}
@Override
public void write(T value) throws Exception {
coder.encode(value, out, Coder.Context.OUTER);
out.write(NEWLINE);
}
@Override
public void writeFooter() throws IOException {
out.finish();
}
}
}
And then to actually do the write:
aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));
Upvotes: 1
Reputation: 14781
TextIO
only supports reading compressed files. It does not support writing files with compression.
https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files
TextIO does not currently support writing to compressed files.
More info:
Upvotes: 1