Reputation: 189
I'm trying to get a JSON from a Kafka topic with this code:
public class FlinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));
messageStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
});
env.execute();
}
}
The issues are:
1) this program does not run due
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
The problem is at line: `messageStream.map(....`
2) Maybe the above issue is related to the fact that DataStream
has no type. But if I try to make:
DataStream<String> messageStream = env.addSource(...
The code will not compile due cannot resolve constructor FlinkKafkaConsumer09 ...
The pom.xml
(the important part):
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
I've been looking for some code in Flink that uses a JSON DeserializationSchema without success. I've just found the unit test for the JSONKeyValueDeserializationSchema
at this link
Does anyone knows how to do the right way?
Thanks
Upvotes: 3
Views: 13075
Reputation: 189
I followed Vishnu viswanath answer, however JSONKeyValueDeserializationSchema raises an exception during JSON parser step, even for a simple JSON as {"name":"John Doe"}
.
The code that throws is:
DataStream<ObjectNode> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(ObjectNode node) throws Exception {
return "Kafka and Flink says: " + node.get(0);
}
}).print();
Output:
09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
at java.lang.Thread.run(Thread.java:745)
I was succeeded using another deserialization schema JSONDeserializationSchema
DataStream<ObjectNode> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONDeserializationSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(ObjectNode value) throws Exception {
return "Kafka and Flink says: " + value.get("key").asText();
}
}).print();
Upvotes: 6
Reputation: 3854
Your error is at the line messageStream.map(new MapFunction<String, String>()
. The mapFunction you defined expects an Input of the type String and output of the type String, but since you are using a JSONKeyValueDeserializationSchema
which converts String to com.fasterxml.jackson.databind.node.ObjectNode
your MapFunction should actually expect an input of the same type ObjectNode. Try the below code.
messageStream.map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(ObjectNode node) throws Exception {
return "Kafka and Flink says: " + node.get(0);
}
});
Upvotes: 4