Maher Marwani
Maher Marwani

Reputation: 61

Flink: print and write to file not working

I have this pipeline : KafkaProducer -> Topic1 -> FlinkConsumer -> Topic2 -> KafkaConsumer

I'm trying to extract the timing of the record for each stage of the pipeline:

In Flink java application I did something like this :

inputstream.

                // To calculate flink input time
                map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.printf("source time : %d\n",System.nanoTime());
                writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
                return s;
            }
        }).

                // Process
                map(new MapFunction<String, String>() {
            @Override
            public String map(String record) throws InterruptedException {
                for(int i=0;i<2;i++)
                    Thread.sleep(1);
                return record + " mapped";
            }
        }).

                // To calculate flink output time
                map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.printf("sink time : %d\n",System.nanoTime());
                writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
                return s;
            }
        }).
                addSink(producer);

While this is working in mini-cluster in Intellij, it doesn't work on a standalone cluster. Can someone plz explain to me why the print and write to csv lines are ignored?

Upvotes: 0

Views: 164

Answers (1)

David Anderson
David Anderson

Reputation: 43454

Whatever the task managers write to stdout goes into files in Flink's log directory on each of the task manager nodes.

Upvotes: 2

Related Questions