Daniel Lee
Daniel Lee

Reputation: 47

Spark Streaming + Kafka integration

I try to integrate spark and kafka in Jupyter notebook by using pyspark. Here is my work environment.

Spark version: Spark 2.2.1 Kafka version: Kafka_2.11-0.8.2.2 Spark streaming kafka jar: spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar

I added a Spark streaming kafka assembly jar file to spark-defaults.conf file.

When i start streamingContext for pyspark streaming, this error appears as can't read kafka version from MANIFEST.MF.

enter image description here

Here is my code.

from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os

from kafka import KafkaProducer

#Receive data handler
def handler(message):
    records = message.collect()
    for record in records:
        print(record)
        #producer.send('receive', str(res))
        #producer.flush()

producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)

#Create Kafka streaming with argv
zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)

ssc.start()

Upvotes: 0

Views: 1080

Answers (1)

skjagini
skjagini

Reputation: 3217

Sorry for my posting in Scala

Spark 2.2.1 with Scala 2.11 and Kafka 0.10 do all work though they are marked as experimental

The proper way to create a stream if using above libraries is to use

val kStrream =  KafkaUtils.createDirectStream(
          ssc, PreferConsistent,
          Subscribe[String, String](Array("weblogs-text"), kafkaParams, fromOffsets))

Pay attention to the dependencies for example kafka has jar files that are specific to the version of Kafka Client version and spark version.

       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>

Upvotes: 2

Related Questions