Reputation: 137
I'm using pyspark to get data from Kafka and inserting it into cassandra. I'm almost there i just need the final step.
def Spark_Kafka_Receiver():
# STEP 1 OK!
dc = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "000.00.0.240:9092") \
.option("subscribe", "MyTopic") \
.load()
dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")
# STEP 2 OK!
dc.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function) \
.start() \
.awaitTermination()
# STEP 3 NEED HELP
def foreach_batch_function(df, epoch_id):
Value = df.select(df.value)
???????
# WRITE DATA FRAME ON CASSANDRA
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace) \
.save()
So i have my Value that is in this format:
DataFrame[value: binary]
i would need to insert something that open my Value take the binary inside and create a nice dataframe with the correct format that mach the database and with it execute the last part of my code.
Upvotes: 1
Views: 344
Reputation: 87154
You don't need to use foreachBatch
anymore. You just need to upgrade to Spark Cassandra Connector 2.5 that natively supports Spark Structured Streaming, so you can just write:
dc.writeStream \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace)
.start() \
.awaitTermination()
Regarding the second part of your question - if you want to convert your value into a multiple columns, you need to use from_json
function, passing the schema to it. Here is example in Scala, but Python code should be quite similar:
val schemaStr = "id:int, value:string"
val schema = StructType.fromDDL(schemaStr)
val data = dc.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema).as("data"))
.select("data.*").drop("data")
and then you can write that data via writeStream
Upvotes: 1