kanataki
kanataki

Reputation: 433

How to load Kafka topic data into a Spark Dstream in Python

I am using Spark 3.0.0 with Python. I have a test_topic in Kafka that am producing to from a csv.

The code below is consuming from that topic into Spark but I read somewhere that it needs to be in a DStream before I can do any ML on it.

import json
from json import loads
from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "test")
ssc = StreamingContext(sc, 1)

consumer = KafkaConsumer('test_topic',
                    bootstrap_servers =['localhost:9092'],
                    api_version=(0, 10))

Consumer returns a <kafka.consumer.group.KafkaConsumer at 0x13bf55b0>

How do I edit the above code to give me a DStream?

I am fairly new so kindly point out any silly mistakes made.

EDIT: Below is my producer code:

import json
import csv
from json import dumps
from kafka import KafkaProducer
from time import sleep

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
value_serializer=lambda x:dumps(x)

with open('test_data.csv') as file:
reader = csv.DictReader(file, delimiter=';')
for row in reader:
    producer.send('test_topic', json.dumps(row).encode('utf=8'))
    sleep(2)
    print ('Message sent ', row)

Upvotes: 1

Views: 2521

Answers (3)

abhishek kumar
abhishek kumar

Reputation: 379

You need to use the org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 package for running it. It will download related jars using the spark-submit.

Upvotes: 1

tricky
tricky

Reputation: 1553

It has been a long time I haven't done some Spark, but let me help you !

First as you are using Spark 3.0.0, you can use Spark Structured Streaming, the API will be much easier to use as it is based on dataframes. As you can see here in the link of the docs, there is an integration guide for kafka with PySpark in Structured Streaming mode.

It would be as simple as this query:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test_topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Then you can play with this dataframe using ML pipelines to apply some ML techniques and models that you need. As you can see in this DataBricks notebook, they have some examples of Structured streaming with ML. This is written in Scala, but it will be a good source of inspiration. You can combine it with the ML PySpark docs, to translate it in Python

EDIT: The actual STEPS to follow in order to make it work between PySpark and Kafka

1 - Kafka Setup

So first I setup my local Kafka:

wget https://archive.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
tar -xzf kafka_2.11-0.10.2.0.tgz

I open 4 shells, to run the zookeeper / server / create_topic / write_topic scripts :

  • Zookeeper
cd kafka_2.11-0.10.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Server
cd kafka_2.11-0.10.2.0
bin/kafka-server-start.sh config/server.properties
  • Create topic and check creation
cd kafka_2.11-0.10.2.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
  • Test message in the topic (write them interactively in the shell for testing purpose):
cd kafka_2.11-0.10.2.0
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2 - PySpark Setup

Getting the additional jars

Now that we have set up our Kafka, we will setup our PySpark with specific jars downloads:

  • spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
  • spark-sql-kafka-0-10_2.12-3.0.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar
  • commons-pool2-2.8.0.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar
  • kafka-clients-0.10.2.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.2/kafka-clients-0.10.2.2.jar

Run the PySpark shell command

Don't forget to specify the folder path for each jars, if you're not in the jars folder when you execute the pyspark command.

PYSPARK_PYTHON=python3 $SPARK_HOME/bin/pyspark --jars spark-sql-kafka-0-10_2.12-3.0.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,kafka-clients-0.10.2.2.jar,commons-pool2-2.8.0.jar

3 - Run the PySpark code

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

query = df \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .start()

Cheers

Upvotes: 3

Idan
Idan

Reputation: 1

You need to use KafkaUtils createDirectStream method.

Here is a code sample from the official Spark documentation:

from pyspark.streaming.kafka import KafkaUtils
 directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

Upvotes: 0

Related Questions