Reputation:
In Apache Flink
, I am not able to see the output in std out
, but my job is running successfully and data is coming
Upvotes: 0
Views: 2769
Reputation:
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname = "192.168.1.73";
final int port = 9000;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.1.68", 6123);
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word").timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
Upvotes: 0
Reputation: 867
As you are running your job on a cluster, DataStreams are printed to the stdout of the TaskManager process. This TaskManager stdout is directed to an .out file in the ./log/ directory of the Flink root directory. I believe this is here you have seen your output.
I don't know if it is possible to change the stdout of TaskManagers, however, a quick and dirty solution could be to write the output to a socket :
output.writeToSocket(outputHost, outputPort, new SimpleStringSchema())
Upvotes: 3