SimAzz
SimAzz

Reputation: 137

How to deal with pySpark structured streaming coming from Kafka to Cassandra

I'm using pyspark to get data from Kafka and inserting it into cassandra. I'm almost there i just need the final step.

def Spark_Kafka_Receiver():

# STEP 1 OK!

    dc = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "000.00.0.240:9092") \
        .option("subscribe", "MyTopic") \
    .load()
    dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")

# STEP 2 OK!

    dc.writeStream \
        .outputMode("append") \
        .foreachBatch(foreach_batch_function) \
        .start() \
        .awaitTermination()

# STEP 3 NEED HELP

def foreach_batch_function(df, epoch_id):
    Value = df.select(df.value)

    ???????

    # WRITE DATA FRAME ON CASSANDRA
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace) \
        .save()

So i have my Value that is in this format:

DataFrame[value: binary]

i would need to insert something that open my Value take the binary inside and create a nice dataframe with the correct format that mach the database and with it execute the last part of my code.

Upvotes: 1

Views: 344

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

You don't need to use foreachBatch anymore. You just need to upgrade to Spark Cassandra Connector 2.5 that natively supports Spark Structured Streaming, so you can just write:

dc.writeStream \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace)
        .start() \
        .awaitTermination()

Regarding the second part of your question - if you want to convert your value into a multiple columns, you need to use from_json function, passing the schema to it. Here is example in Scala, but Python code should be quite similar:

val schemaStr = "id:int, value:string"
val schema = StructType.fromDDL(schemaStr)
val data = dc.selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", schema).as("data"))
  .select("data.*").drop("data")

and then you can write that data via writeStream

Upvotes: 1

Related Questions