Reputation: 1
We have the requirement to compress and send files to S3 with zstd
compression. I am trying to implement zstandard
compression plugin with fluentd
s3 plugin so that the data that is sent to s3 using fluentd
agent is stored in zstd
format and not the default gzip. Not able to find any solution yet. Any help is appreciated.
Installed td-agent fluentd in a Linux system and installed zstd-ruby
and ztsd
gems. Created the s3_compressor_zstd.rb
algo and copied it to /opt/x.x.x.x/fluent-plugin-s3-1.7.2/lib/fluent/plugin
directory. While starting the fluentd agent I am getting the below error:
OS: Ubuntu 18.04.6 LTS
td-agent version: 4.4.2
fluentd version: 1.15.3
zstd version: 1.1.2.1
zstd-ruby version: 1.5.2.3
s3_compressor_zstd.rb
file:
require "zstd"
module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)
def initialize(options = {})
begin
require "zstd"
rescue LoadError
raise Fluent::ConfigError, "Install zstd-ruby gem before using zstd compressor"
end
end
def ext
".zst"
end
def compress(data)
compressed_data = ''
Zstd::Writer.open(StringIO.new(compressed_data)) do |compressed_stream|
compressed_stream.write(data)
end
compressed_data
end
end
end
end
Upvotes: 0
Views: 445
Reputation: 14657
According to Use your (de)compression algorithm, compress
takes two arguments:
# chunk is buffer chunk. tmp is destination file for upload
def compress(chunk, tmp)
# call command or something
end
You need to update your implementation accordingly.
Check the implementation of the existing supported compressors here:
Here's a thread that discusses snappy compressor (might be helpful):
UPDATE
Using zstd gem, the implementation of the compressor may look like this (not tested):
require 'zstd'
module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)
def initialize(options = {})
begin
require 'zstd'
rescue LoadError
raise Fluent::ConfigError, 'zstd gem not installed, run: gem install zstd'
end
end
def ext
'zst'.freeze
end
def compress(chunk, tmp)
s = StringIO.new
chunk.write_to(s)
tmp.write(Zstd.new.compress(s.string))
end
end
end
end
UPDATE 26/01/2025
https://github.com/fluent/fluent-plugin-s3 itself now supports zstd compression since Nov 6, 2024.
PR: https://github.com/fluent/fluent-plugin-s3/pull/439
Upvotes: 1