miro
miro

Reputation: 189

Flink + Kafka + JSON - java example

I'm trying to get a JSON from a Kafka topic with this code:

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream messageStream = env.addSource(
                new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

        messageStream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        });

        env.execute();
    }
}

The issues are:

1) this program does not run due

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`

2) Maybe the above issue is related to the fact that DataStream has no type. But if I try to make:

DataStream<String> messageStream = env.addSource(...

The code will not compile due cannot resolve constructor FlinkKafkaConsumer09 ...

The pom.xml (the important part):

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

I've been looking for some code in Flink that uses a JSON DeserializationSchema without success. I've just found the unit test for the JSONKeyValueDeserializationSchema at this link

Does anyone knows how to do the right way?

Thanks

Upvotes: 3

Views: 13075

Answers (2)

miro
miro

Reputation: 189

I followed Vishnu viswanath answer, however JSONKeyValueDeserializationSchema raises an exception during JSON parser step, even for a simple JSON as {"name":"John Doe"}.

The code that throws is:

DataStream<ObjectNode> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(ObjectNode node) throws Exception {
        return "Kafka and Flink says: " + node.get(0);
    }
}).print();

Output:

09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
    at java.lang.Thread.run(Thread.java:745)

I was succeeded using another deserialization schema JSONDeserializationSchema

        DataStream<ObjectNode> messageStream = env.addSource(
            new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                    , new JSONDeserializationSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode value) throws Exception {
            return "Kafka and Flink says: " + value.get("key").asText();
        }
    }).print();

Upvotes: 6

vishnu viswanath
vishnu viswanath

Reputation: 3854

Your error is at the line messageStream.map(new MapFunction<String, String>(). The mapFunction you defined expects an Input of the type String and output of the type String, but since you are using a JSONKeyValueDeserializationSchema which converts String to com.fasterxml.jackson.databind.node.ObjectNode your MapFunction should actually expect an input of the same type ObjectNode. Try the below code.

messageStream.map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode node) throws Exception {
            return "Kafka and Flink says: " + node.get(0);
        }
    });

Upvotes: 4

Related Questions