Reputation: 12371
I'm using Flink to process my streaming data.
The streaming is coming from some other middleware, such as Kafka, Pravega, etc.
Saying that Pravega is sending some word stream, hello world my name is...
.
What I need is three steps of process:
MyJson
.MyJson
to String.For example, for the stream hello world my name is
, I should get five files.
Here is my code:
// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// map MyJson to String
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
// output
valueInJson.print();
This code will output all of results to Flink log files.
My question is how to write one word to one output file?
Upvotes: 0
Views: 945
Reputation: 43419
I think the easiest way to do this would be with a custom sink.
stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) {
// generate a unique name for the new file and open it
// write the word to the file
// close the file
}
}
Note that this implementation won't necessarily provide exactly once behavior. You might want to take care that the file naming scheme is both unique and deterministic (rather than depending on processing time), and be prepared for the case that the file may already exist.
Upvotes: 1