Reputation: 424
Store kafka data in hdfs as parquet format using flink, I am trying with fink documentation which is not working.
I am not finding any proper documentations to store it as parquet file
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
final List<Datum> data = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
DataStream<Datum> stream = env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
stream.addSink(
StreamingFileSink.forBulkFormat(
Path.fromLocalFile(new File("path")),
ParquetAvroWriters.forReflectRecord(String.class))
.build());
env.execute();
I have created a serializable class
public static class Datum implements Serializable {
public String a;
public int b;
public Datum() {
}
public Datum(String a, int b) {
this.a = a;
this.b = b;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Datum datum = (Datum) o;
return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
}
@Override
public int hashCode() {
int result = a != null ? a.hashCode() : 0;
result = 31 * result + b;
return result;
}
}
The above code is not writing any data to file, it just keeps on creating many files.
If anyone can help with proper documentation or code
Upvotes: 2
Views: 918
Reputation: 3634
As written on the documentation of StreamingFileSink
:
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints. If checkpointing is disabled part files will forever stay in
in-progress
orpending
state and cannot be safely read by downstream systems.
To enable, just use
env.enableCheckpointing(1000);
You have quite a few options to tweak it.
Here is a complete example
final List<Address> data = Arrays.asList(
new Address(1, "a", "b", "c", "12345"),
new Address(2, "p", "q", "r", "12345"),
new Address(3, "x", "y", "z", "12345")
);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
DataStream<Address> stream = env.addSource(
new FiniteTestSource<>(data), TypeInformation.of(Address.class));
stream.addSink(
StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
ParquetAvroWriters.forSpecificRecord(Address.class))
.build());
env.execute();
Upvotes: 2