Reputation: 530
I have a Apache Flink Application, where I want to filter the data by Country which gets read from topic v01 and write the filtered data into the topic v02. For testing purposes I tried to write everything in uppercase.
My Code:
package org.example;
import org.apache.avro.Schema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaRead {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.100.100:9092");
properties.setProperty("group.id", "luft");
String schemaRegistryUrl = "http://192.168.100.100:8081";
String valueSchema = "{\"connect.name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"fields\":[{\"default\":null,\"name\":\"Datum\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"Country\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"City\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"Specie\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"count\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"min\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"max\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"median\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"variance\",\"type\":[\"null\",\"string\"]}],\"name\":\"Value\",\"namespace\":\"com.github.jcustenborder.kafka.connect.model\",\"type\":\"record\"}";
Schema schema = new Schema.Parser().parse(valueSchema);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<> ("v01", ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryUrl) , properties);
kafkaConsumer.setStartFromEarliest();
DataStream<String> streamIn = env.addSource(kafkaConsumer);
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("v02",ConfluentRegistryAvroSerializationSchema.forGeneric("v02-value",schema,schemaRegistryUrl),properties);
DataStream<String> streamOut = streamIn.map(new MapFunction<String, String>() { //<--Error in this line
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
streamOut.addSink(kafkaProducer);
env.execute("Flink Streaming In/Out Kafka");
}
}
When executing I get following error:
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
at org.example.KafkaRead$1.map(KafkaRead.java:45)
It works without problems if I dont use a map function and use my Input as my Output:
DataStream<String> streamOut = streamIn;
For Context:
The data that gets read in the first place looks like this
Date,Country,City,Specie,count,min,max,median,variance
2020-03-24,DE,Hamburg,humidity,288,26.0,54.0,36.5,966.48
2020-03-26,DE,Hamburg,humidity,288,25.0,71.5,44.0,1847.14
2020-04-01,DE,Hamburg,humidity,288,61.0,83.0,75.0,418.07
The csv file are read via the SpoolDirCsvSourceConnector in Kafka. Apache Flink reads from topic v01, which gets an automatic generated schema via the connector, which is saved as avro. In the next step I want to analyse it with filters inside flink and after that I want to write back to v02. For example filtering via country.
Complete Error:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
at org.example.KafkaRead$1.map(KafkaRead.java:45)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Upvotes: 0
Views: 2997
Reputation: 3874
Just to extend the comment that has been added. So, basically if You use ConfluentRegistryAvroDeserializationSchema.forGeneric
the data produced my the consumer isn't really String
but rather GenericRecord<T>
.
So, the moment You will try to use it in Your map that expects String
it will fail, because your DataStream
is not DataStream<String>
but rather DataStream<GenericRecord>
.
Now, it works if You remove the map
only because You havent specified the type when defining FlinkKafkaConsumer
and your FlinkKafkaProducer
, so Java will just try to cast every object to required type. Your FlinkKafkaProducer
is actually FlinkKafkaProducer<GenericRecord>
so there will be no problem there and thus it will work as it should.
In this particular case You don't seem to be needing Avro at all, since the data is just raw CSV.
UPDATE:
Seems that You are actually processing Avro, in this case You need to change the type of Your DataStream<String>
to DataStream<GenericRecord>
and all the functions You gonna write are going to work using GenericRecord
not String
.
So, You need something like:
.map(new MapFunction<GenericRecord, T>(){...})
Upvotes: 1