hasha
hasha

Reputation: 294

SparkStreaming with Kafka : AvroRunTimeException with presence of latin characters

I am trying to read data from Kafka topic which is serialized in Avro format, in Spark Streaming application.

I am getting below exceptions when converting from byte[] to GenericRecord. I tried to print the length of the byte array and it is showing 957.

When I converted the

byte[] to String type

, I can see the record.I am not sure why is this Malformed data Exception message here.I see that there are some latin characters in the record.

I have gone through many posts, but I didn't get proper solution.

I was using Twitter Bijection API for serializing data using AVRO format.

Thera are some post that suggested to use DatumReader and DatumWriter.But that didn't work out as well.

ERROR] 07-13-2018 08:35:41,793 com.example.DataTransformationStarter main 154- Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apach.avro.AvroRuntimeException: Malformed data. Length is negative: -62
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.example.ProcessorFactory.getSourceType(ProcessorFactory.java:117)
    at com.example.ProcessorFactory.getProcessor(ProcessorFactory.java:48)
    at com.example.processRecord(RddMicroBatchProcessor.java:166)
    at com.example.lambda$processEachBatch$65712684$1(RddMicroBatchProcessor.java:64)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Also I am using same schema for deserialization and serialization.

private static  SourceType getSourceType(byte[] scrmRecord){
        Schema.Parser parser = new Schema.Parser();
  //  Schema schema = parser.parse(MDMCommonUtils.getBaseAvroSchema());
    Schema schema = parser.parse(MDMCommonUtils.getCRTAvroSchema());



//      DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
//    Decoder decoder= DecoderFactory.get().binaryDecoder(scrmRecord, null);
//    GenericRecord  record = null;
//       try {
//           record = reader.read(null, decoder);
//      } catch (IOException e) {
//          e.printStackTrace();
//      }
//       return SourceType.fromString(record.get("srcsystemcd")!=null?record.get("srcsystemcd").toString():"");

         DatumReader<GenericRecord>  datumReader = new GenericDatumReader<>(schema);
         BinaryDecoder decoder1 = DecoderFactory.get().binaryDecoder(scrmRecord, null);

         GenericRecord record1 = null;
         try {
             record1 = datumReader.read(null, decoder1);
        } catch (IOException e) {
            e.printStackTrace();
        }

         String name = ((Utf8) record1.get("name")).toString();
         return SourceType.fromString(record1.get("srcsystemcd")!=null?((Utf8) record1.get("name")).toString():"");
    }

Schema :
{ "namespace": "example.avro",

  "type": "record",
  "name": "baseschema",
  "fields": [
    {"name": "srcsystemcd", "type": "string"}
  ]
}

Source record has some latin characters.

{"srcsystemcd":"T12","srcupdatedt":"2011-02-27 10:01:40.0","pkeysrcobject":"1234567","modeind":"test1","srcid":"CRT","svtid":"","srcstatuscd":"active","geocd":"ABCD","regioncd":"ABDCE","channelid":"NA","fl":"N","roletypeid":"12","hqfl":"Y","websiteurl":"","emailaddr":"","phonenum":"","faxnum":"","matchnm":"Základnín","dbanm":"","legalnm":"Základnín","deptnm":"","addr1txt":"abcde 11","addr2txt":"","storenum":"","citynm":"abcde","districtnm":"","countynm":"","stateprovincecd":"","stateprovincenm":"","postalcd":"1234","countryiso2cd":"CZ","countryiso3cd":"CZE","altdbanm":"","altlegalnm":"","altdeptnm":"","altaddr1txt":"","altaddr2txt":"","altstorenum":"","altcitynm":"","altdistrictnm":"","altcountynm":"","altstateprovincecd":"","altstateprovincenm":"","altpostalcd":"","altcountryiso2cd":"","altcountryiso3cd":"","altlangcd":"","recdeletefl":"N"}

Are these exceptions because of presence of latin character.Any pointers and help would really be appreciated.

Upvotes: 0

Views: 776

Answers (1)

hasha
hasha

Reputation: 294

I think, the issue here is not about latin characters.Here the issue is that the mismatch of Serializer and Deserializer used.

Data tha is pushed on the topic is Serialized using String serialized and it is deserialized using ByeArrayDeserializer.This mismatch is causing the issue.

Upvotes: 0

Related Questions