Likith Kailas
Likith Kailas

Reputation: 11

Flink streaming - latency and throughput detection

I am trying to run the Flink streaming job. I want to determine the throughput and latency for the streaming process. i have started the Kafka broker server and have incoming messages from kafka.How do i count messages per second (Throughput)? (Like rdd.count. Is there any similar method to get the count of incoming messages)

(Complete scenerio : I have sent the message through Producer as a Json Object. I am adding some information like name as string and also System.currentTimeMills in the Json object. During streaming , how do i obtain the sent json object through messageStream(DataStream)?)

Thanks in advance.

CODE :

/** * Read Strings from Kafka and print them to standard out. */

public static void main(String[] args) throws Exception {
    System.setProperty("hadoop.home.dir", "c:/winutils/");
    // parse input argum    ents
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);

    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer010<>(
                    parameterTool.getRequired("topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));


    messageStream.print();

    env.execute();
}

Upvotes: 0

Views: 1777

Answers (2)

Biplob Biswas
Biplob Biswas

Reputation: 1881

There are a few metrics which are available in the Flink UI where you can calculate the number of events per second and stuff like that.

You can also add your own metrics where you calculate some numbers based on your requirements and this can be displayed in the Flink UI.

And lastly for specifically latency tracking maybe you can try what's explained here - latency-tracking and similarly you can get throughputs using - meters

Upvotes: 1

David Anderson
David Anderson

Reputation: 43707

This benchmarking application might be a good place to start. The documentation on latency tracking and the metrics available from Flink's Kafka connector should also be interesting reading.

Upvotes: 0

Related Questions