DANG BUI HUU
DANG BUI HUU

Reputation: 3

How to use sinkTo in Flink to write multi file in AWS S3

i want to save data of many customers in different files.

Now, data is being stored in DataStream variable. How to use sinkTo to write multi file in AWS S3. Each file contains the data of a customer and the file name is the customer name.

public static void writeMultiFile(DataStream<Tuple5<String, Long, Double, String, String>> data) throws Exception {
    String currentDir = System.getProperty("user.dir");
    Path pathNew = new Path(currentDir + "/output/");

    OutputFileConfig config = OutputFileConfig
            .builder()
            .withPartPrefix("namefile")
            .withPartSuffix(".parquet")
            .build();

    final FileSink<GenericRecord> sink = FileSink
            .forBulkFormat(pathNew, AvroParquetWriters.forGenericRecord(schema))
            .withOutputFileConfig(config)
            .build();

    data.keyBy(value->value.f0).map(new convertGenericRecord()).sinkTo(sink);
}

I need the file names to change by key. The code above can't do that, filename must be predefined, it cannot be changed dynamically by key.

Please help me!

Upvotes: 0

Views: 666

Answers (1)

David Anderson
David Anderson

Reputation: 43419

You can do this by implementing a BucketAssigner.

Something along these lines:

public static final class KeyBucketAssigner
        implements BucketAssigner<Event, String> {

    private static final long serialVersionUID = 987325769970523326L;

    @Override
    public String getBucketId(final Event element, final Context context) {
        return String.valueOf(Event.key);
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

documentation
javadocs

Upvotes: 1

Related Questions