Reputation: 1
I'm writing some script for analytics in Pyspark. And I can not set up streaming to Cassandra from Kafka. When it's one data it is ok, but when is streaming it wont work.
I'm reading about some way with foreachBatch
, but I'm new in Pyspark, and I can't successfully write down that, because documentation for that is lame.
Can someone help me about foreachBatch
in my script.
I'm providing datas from Kafka topic to Cassandra.
import os, json, time
from pyspark.sql import functions as F
from pyspark.sql import types
from pyspark.sql import SparkSession
def parse_value(value):
value_object = json.loads(value)
return [value_object["topicData"]["serialNumber"]+":"+str(value_object["msg"]["params"]["device_id"])+":"+str(value_object["msg"]["timestamp"]),
value_object["msg"]["params"]["service_name"],
str(value_object["msg"]["timestamp"]),
value_object["msg"]["params"]["property_value"]]
parse_value_udf = F.udf(lambda x: parse_value(x), types.ArrayType(types.StringType()))
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topicsForEvents123") \
.load() \
.selectExpr("CAST(value AS STRING)")
df = df.withColumn('_id', parse_value_udf(df.value)[0]).withColumn('property_name', parse_value_udf(df.value)[1]).withColumn('time', parse_value_udf(df.value)[2]).withColumn('value', parse_value_udf(df.value)[3])
df = df.select('_id','property_name','time','value')
query = df \
.writeStream \
.outputMode("append") \
.format("org.apache.spark.sql.cassandra") \
.option("property_change","strat_history_keyspace_cassandra_raw1")\
.start()
query.awaitTermination()
I just want someone to show me example of foreachBatch
in Pyspark on this script, if it is ok to try. Because I can't understand documentation and how to put this, with my piece of code.
DOCUMENTATION SAYS:
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
And I can't understand how to send df
and epoch_id
. Where to find id
, and which exactly df
to send function as a parameter.
Upvotes: 0
Views: 2474
Reputation: 1348
foreachBatch
is an output sink that let you process each streaming micro-batch as a non-streaming dataframe.
If you want to try a minimal working example you can just print the dataframe to the console:
def foreach_batch_function(df, epoch_id):
df.show()
df.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function) \
.start() \
.awaitTermination()
There is nothing else to do.
Spark reads from the input stream (Kafka) continuously and send each micro-batch to the function you defined (foreach_batch_function
) automatically.
The function will receive the dataframe and the batch id. You don't have to call the function manually.
At this point you can treat the dataframe as a batch dataframe and perform all the transformations and actions you want.
If you want to output the micro-batch dataframe to Cassandra you need to edit the foreach_batch_function
function like
def foreach_batch_function(df, epoch_id):
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table="TABLE_NAME", keyspace="KEYSPACE_NAME") \
.save()
Upvotes: 2