Akash Saini
Akash Saini

Reputation: 1

Is there any way to implement zstd compression in fluentd s3 plugin?

We have the requirement to compress and send files to S3 with zstd compression. I am trying to implement zstandard compression plugin with fluentd s3 plugin so that the data that is sent to s3 using fluentd agent is stored in zstd format and not the default gzip. Not able to find any solution yet. Any help is appreciated.

Installed td-agent fluentd in a Linux system and installed zstd-ruby and ztsd gems. Created the s3_compressor_zstd.rb algo and copied it to /opt/x.x.x.x/fluent-plugin-s3-1.7.2/lib/fluent/plugin directory. While starting the fluentd agent I am getting the below error:

Logs of td-agent service

OS: Ubuntu 18.04.6 LTS
td-agent version: 4.4.2
fluentd version: 1.15.3
zstd version: 1.1.2.1
zstd-ruby version: 1.5.2.3

s3_compressor_zstd.rb file:

require "zstd"
module Fluent::Plugin
  class S3Output
    class ZstdCompressor < Compressor
      S3Output.register_compressor('zstd', self)
      def initialize(options = {})
        begin
          require "zstd"
        rescue LoadError
          raise Fluent::ConfigError, "Install zstd-ruby gem before using zstd compressor"
        end
      end
      def ext
        ".zst"
      end
      def compress(data)
        compressed_data = ''
        Zstd::Writer.open(StringIO.new(compressed_data)) do |compressed_stream|
          compressed_stream.write(data)
        end
        compressed_data
      end
    end
  end
end

Upvotes: 0

Views: 445

Answers (1)

Azeem
Azeem

Reputation: 14657

According to Use your (de)compression algorithm, compress takes two arguments:

# chunk is buffer chunk. tmp is destination file for upload
def compress(chunk, tmp)
  # call command or something
end

You need to update your implementation accordingly.

Check the implementation of the existing supported compressors here:

Here's a thread that discusses snappy compressor (might be helpful):


UPDATE

Using zstd gem, the implementation of the compressor may look like this (not tested):

require 'zstd'

module Fluent::Plugin
  class S3Output
    class ZstdCompressor < Compressor
      S3Output.register_compressor('zstd', self)

      def initialize(options = {})
        begin
          require 'zstd'
        rescue LoadError
          raise Fluent::ConfigError, 'zstd gem not installed, run: gem install zstd'
        end
      end

      def ext
        'zst'.freeze
      end

      def compress(chunk, tmp)
        s = StringIO.new
        chunk.write_to(s)
        tmp.write(Zstd.new.compress(s.string))
      end
    end
  end
end

UPDATE 26/01/2025

https://github.com/fluent/fluent-plugin-s3 itself now supports zstd compression since Nov 6, 2024.

PR: https://github.com/fluent/fluent-plugin-s3/pull/439

Upvotes: 1

Related Questions