sirdan
sirdan

Reputation: 1028

KafkaUtils.createDirectStream does not take right parameters - Spark Streaming + Kafka

I have an application which sends serialized Twitter data to a Kafka topic. All good so far.

The consumer application should read data and deserialize it. Now, when I call KafkaUtils.createDirectStream, I think I put the right parameters (as you will see inside the thrown error), so I can't understand why it is not working.

The method createDirectStream(JavaStreamingContext, Class -K-, Class -V-, Class -KD-, Class -VD-, Map -String,String-, Set -String-) in the type KafkaUtils is not applicable for the arguments (JavaStreamingContext, Class-String-, Class-Status-, Class -StringDeserializer-, Class -StatusDeserializer-, Map-String,String-, Set-String-)

Checking the Spark Javadoc, my params still seem right to me.

My code is:

Set<String> topics = new HashSet<>();
topics.add("twitter-test");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(duration));
Map<String, String> props = new HashMap<>();
//some properties...
JavaPairInputDStream messages =  KafkaUtils.createDirectStream(jssc, String.class, Status.class, org.apache.kafka.common.serialization.StringDeserializer.class, stream_data.StatusDeserializer.class, props, topics);

Status serializer code:

public class StatusSerializer implements Serializer<Status> {

  @Override public byte[] serialize(String s, Status o) {

           try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(baos);
                oos.writeObject(o);
                oos.close();
                byte[] b = baos.toByteArray();
                return b;
            } catch (IOException e) {
                return new byte[0];
            }
        }

      @Override public void close() {

      }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {


    }

}

Upvotes: 1

Views: 763

Answers (1)

Ishan Kumar
Ishan Kumar

Reputation: 2082

Looks like the issue is with "stream_data.StatusDeserializer.class". Can you please the code of this custom deserializer class. Also, can you please look into this Kafka Consumer for Spark written in Scala for Kafka API 0.10: custom AVRO deserializer .

Include the below in the KafkaParam arguments.

key.deserializer -> classOf[StringDeserializer]
value.deserializer -> classOf[StatusDeserializer]

Upvotes: 2

Related Questions