Marko Milosavljevic
Marko Milosavljevic

Reputation: 1

Is there a way to set up structured streaming with pyspark from Kafka to Cassandra

I'm writing some script for analytics in Pyspark. And I can not set up streaming to Cassandra from Kafka. When it's one data it is ok, but when is streaming it wont work.

I'm reading about some way with foreachBatch, but I'm new in Pyspark, and I can't successfully write down that, because documentation for that is lame.

Can someone help me about foreachBatch in my script.

I'm providing datas from Kafka topic to Cassandra.

import os, json, time
from pyspark.sql import  functions as F
from pyspark.sql import types
from pyspark.sql import SparkSession


def parse_value(value):
    value_object = json.loads(value)
    return [value_object["topicData"]["serialNumber"]+":"+str(value_object["msg"]["params"]["device_id"])+":"+str(value_object["msg"]["timestamp"]),
    value_object["msg"]["params"]["service_name"],
    str(value_object["msg"]["timestamp"]),
    value_object["msg"]["params"]["property_value"]]



parse_value_udf = F.udf(lambda x: parse_value(x), types.ArrayType(types.StringType()))

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topicsForEvents123") \
  .load() \
  .selectExpr("CAST(value AS STRING)")


df = df.withColumn('_id', parse_value_udf(df.value)[0]).withColumn('property_name', parse_value_udf(df.value)[1]).withColumn('time', parse_value_udf(df.value)[2]).withColumn('value', parse_value_udf(df.value)[3])

df = df.select('_id','property_name','time','value')
query = df \
    .writeStream \
    .outputMode("append") \
    .format("org.apache.spark.sql.cassandra") \
    .option("property_change","strat_history_keyspace_cassandra_raw1")\
    .start()

query.awaitTermination()

I just want someone to show me example of foreachBatch in Pyspark on this script, if it is ok to try. Because I can't understand documentation and how to put this, with my piece of code.

DOCUMENTATION SAYS:

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()  

And I can't understand how to send df and epoch_id. Where to find id, and which exactly df to send function as a parameter.

Upvotes: 0

Views: 2474

Answers (1)

vinsce
vinsce

Reputation: 1348

foreachBatch is an output sink that let you process each streaming micro-batch as a non-streaming dataframe.

If you want to try a minimal working example you can just print the dataframe to the console:

def foreach_batch_function(df, epoch_id):
    df.show()

df.writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function) \
    .start() \
    .awaitTermination()

There is nothing else to do.

Spark reads from the input stream (Kafka) continuously and send each micro-batch to the function you defined (foreach_batch_function) automatically. The function will receive the dataframe and the batch id. You don't have to call the function manually. At this point you can treat the dataframe as a batch dataframe and perform all the transformations and actions you want.

If you want to output the micro-batch dataframe to Cassandra you need to edit the foreach_batch_function function like

def foreach_batch_function(df, epoch_id):
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table="TABLE_NAME", keyspace="KEYSPACE_NAME") \
        .save()

Upvotes: 2

Related Questions