Wassinger
Wassinger

Reputation: 397

How to write from a PySpark DStream to Redis?

I am using PySpark 2.3.1 to read a stream of values from Kafka as DStreams. I want to do some transforms on this data, like take a moving average, and save it to Redis. My spark job code looks a bit like this:

batch_duration = 1

# Initialize session
spark_session = SparkSession \
    .builder \
    .appName("my-app") \
    .getOrCreate()

spark_context = spark_session.sparkContext

# Create streaming context (=connection to Spark)
streaming_context = StreamingContext(spark_context, batch_duration)

# Read from Kafka
input = KafkaUtils \
    .createDirectStream(streaming_context, ['price'], {"metadata.broker.list": kafka_urls})

I can then transform it with lines like:

jsons = input.window(5000).map(lambda t: t[1]).map(json.loads)
prices = jsons.map(lambda d: d['price'])
total = prices.reduce(lambda x, y: x + y)

However total in this case is still a DStream, and the documentation for Redis says that only Dataframes can be written from PySpark. Fortunately, DStream produces periodic RDD's as it runs - so I have to figure out how to convert the RDD to a Dataframe.

I tried

total.foreachRDD(lambda rdd:
                 rdd.toDF().write.format("org.apache.spark.sql.redis") \
                 .option("table", "people") \
                 .option("key.column", "name") \
                 .save())

Admittedly this was copied and pasted blindly from elsewhere on the net, so the option calls almost certainly don't match my data schema. I was hoping to decipher the exceptions and figure out where to go next. Unfortunately running this on my Spark cluster prints many lines of Java stack traces, and scrolls the original Python exception outside of my console history, so I can't figure out what is causing the problem.

Upvotes: 1

Views: 2829

Answers (1)

fe2s
fe2s

Reputation: 435

Here is a Word Counter example that saves result to Redis:

import sys

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext

def save_rdd(rdd):
    if not rdd.isEmpty():
        df = rdd.toDF()
        df.show()
        df.write.format("org.apache.spark.sql.redis").option("table", "counts").option("key.column", "_1").save(mode='append')

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Example") \
        .getOrCreate()

    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)

    counts.foreachRDD(save_rdd)

    ssc.start()
    ssc.awaitTermination()

Submit command:

./bin/spark-submit --master spark://Oleksiis-MacBook-Pro.local:7077 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0,com.redislabs:spark-redis:2.4.0 ~/Projects/spark-redis-test/src/main/scala/com/redislabs/provider/test/spark-direct-kafka.py localhost:9092 new_topic

Please note that I included com.redislabs:spark-redis:2.4.0 package.

Write some words to new_topic:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic new_topic

>a b c a
>a b b

The output should appear in Redis as hashes where the key corresponds to the input word:

./redis-cli

keys counts:*

1) "counts:a"
2) "counts:b"
3) "counts:c"

127.0.0.1:6379> hgetall counts:a
1) "_2"
2) "2"

If you'd like to save DataFrame with some meaningful column names rather than _1, _2, etc - you can rename columns like this:

from pyspark.sql.functions import col

def save_rdd(rdd):
    if not rdd.isEmpty():
        df = rdd.toDF().select(col("_1").alias("word"), col("_2").alias("count"))
        df.show()
        df.write.format("org.apache.spark.sql.redis").option("table", "counts").option("key.column","word").save(mode='append')

Note that now we set key.coumn parameter to be word.

Now the field name in Redis is "count":

127.0.0.1:6379> hgetall counts:abc
1) "count"
2) "1"

Hope it helps!

Upvotes: 1

Related Questions