Reputation: 21
I'm aiming to convert the messages I received from kafka to parquet file here, but I could be wrong. Would you help me with this topic?
private static SinkFunction<String> createFileSink(String outputPath) {
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024)
.build())
.build();
return sink;
}
Upvotes: 0
Views: 608
Reputation: 658
You should use bulk-encoded-format for writing Parquet. RowFormat is used for writing text, csv, json, etc.
Upvotes: 2