Tadayasu Yotsu
Tadayasu Yotsu

Reputation: 159

How to compress output file in Dataflow Java SDK?

My pipeline stores the output data ,file, to GCS. I would like to compress this file. TextIO has uncompress the file which is compressed, but I guess it does not have compress the file. How can I compress output file?

Upvotes: 1

Views: 1111

Answers (3)

Will
Will

Reputation: 126

As Thang mentioned this is now possible in the beam sdk version 2 by adding .withCompression(Compression.GZIP):

// Without Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"));

// With Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"))
      .withSuffix(".txt")
      .withCompression(Compression.GZIP));

The full example given can be found in the docs

Upvotes: 0

Thang
Thang

Reputation: 137

This is currently an open feature request for DataFlow, however the work has already been done in Beam. Once Dataflow 2.0 is released (which will be based on Beam) this should be officially supported.

That said, I have been able write compressed GZIP files by extending the FileBasedSink class and utilizing Jeff Payne's work on this feature in Beam.

public class GZIPSink<T> extends FileBasedSink<T>  {
    private final Coder<T> coder;

    GZIPSink(String baseOutputFilename, Coder<T> coder) {
        super(baseOutputFilename, ".gz");
        this.coder = coder;
    }

    @Override
    public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
        return new GZIPWriteOperation(this, coder);
    }

    static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
        private final Coder<T> coder;

        private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
            super(sink);
            this.coder = coder;
        }

        @Override
        public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
            return new GZIPBasedWriter(this, coder);
        }
    }

    static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
        private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
        private final Coder<T> coder;
        private GZIPOutputStream out;

        public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
            super(writeOperation);
            this.mimeType = MimeTypes.BINARY;
            this.coder = coder;
        }

        @Override
        protected void prepareWrite(WritableByteChannel channel) throws Exception {
            out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
                def.setLevel(def.BEST_COMPRESSION);
            }};
        }

        @Override
        public void write(T value) throws Exception {
            coder.encode(value, out, Coder.Context.OUTER);
            out.write(NEWLINE);
        }

        @Override
        public void writeFooter() throws IOException {
            out.finish();
        }
    }
}     

And then to actually do the write:

aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));

Upvotes: 1

Graham Polley
Graham Polley

Reputation: 14781

TextIO only supports reading compressed files. It does not support writing files with compression.

https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files

TextIO does not currently support writing to compressed files.

More info:

Upvotes: 1

Related Questions