Reputation: 433
I am using Spark 3.0.0 with Python.
I have a test_topic
in Kafka that am producing to from a csv.
The code below is consuming from that topic into Spark but I read somewhere that it needs to be in a DStream before I can do any ML on it.
import json
from json import loads
from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "test")
ssc = StreamingContext(sc, 1)
consumer = KafkaConsumer('test_topic',
bootstrap_servers =['localhost:9092'],
api_version=(0, 10))
Consumer returns a <kafka.consumer.group.KafkaConsumer at 0x13bf55b0>
How do I edit the above code to give me a DStream?
I am fairly new so kindly point out any silly mistakes made.
EDIT: Below is my producer code:
import json
import csv
from json import dumps
from kafka import KafkaProducer
from time import sleep
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
value_serializer=lambda x:dumps(x)
with open('test_data.csv') as file:
reader = csv.DictReader(file, delimiter=';')
for row in reader:
producer.send('test_topic', json.dumps(row).encode('utf=8'))
sleep(2)
print ('Message sent ', row)
Upvotes: 1
Views: 2521
Reputation: 379
You need to use the org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 package for running it. It will download related jars using the spark-submit.
Upvotes: 1
Reputation: 1553
It has been a long time I haven't done some Spark, but let me help you !
First as you are using Spark 3.0.0, you can use Spark Structured Streaming, the API will be much easier to use as it is based on dataframes. As you can see here in the link of the docs, there is an integration guide for kafka with PySpark in Structured Streaming mode.
It would be as simple as this query:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test_topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Then you can play with this dataframe using ML pipelines to apply some ML techniques and models that you need. As you can see in this DataBricks notebook, they have some examples of Structured streaming with ML. This is written in Scala, but it will be a good source of inspiration. You can combine it with the ML PySpark docs, to translate it in Python
EDIT: The actual STEPS to follow in order to make it work between PySpark and Kafka
So first I setup my local Kafka:
wget https://archive.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
tar -xzf kafka_2.11-0.10.2.0.tgz
I open 4 shells, to run the zookeeper / server / create_topic / write_topic scripts :
cd kafka_2.11-0.10.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
cd kafka_2.11-0.10.2.0
bin/kafka-server-start.sh config/server.properties
cd kafka_2.11-0.10.2.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
cd kafka_2.11-0.10.2.0
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Now that we have set up our Kafka, we will setup our PySpark with specific jars downloads:
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.2/kafka-clients-0.10.2.2.jar
Don't forget to specify the folder path for each jars, if you're not in the jars folder when you execute the pyspark command.
PYSPARK_PYTHON=python3 $SPARK_HOME/bin/pyspark --jars spark-sql-kafka-0-10_2.12-3.0.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,kafka-clients-0.10.2.2.jar,commons-pool2-2.8.0.jar
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
query = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.start()
Cheers
Upvotes: 3
Reputation: 1
You need to use KafkaUtils createDirectStream method.
Here is a code sample from the official Spark documentation:
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
Upvotes: 0