Atanu chatterjee
Atanu chatterjee

Reputation: 477

Writing Spark Structure Streaming data into Cassandra

I want to write Structure Streaming Data into Cassandra using Pyspark API.

My data flow is like below:

Nifi -> Kafka -> Spark Structure Streaming -> Cassandra

I have tried below way:

query = df.writeStream\
  .format("org.apache.spark.sql.cassandra")\
  .option("keyspace", "demo")\
  .option("table", "test")\
  .start()

But getting below error message: "org.apache.spark.sql.cassandra" does not support streaming write.

Also another approach I have tried: [Source - DSE 6.0 Administrator Guide]

query = df.writeStream\
   .cassandraFormat("test", "demo")\
   .start()

But got exception: AttributeError: 'DataStreamWriter' object has no attribute 'cassandraFormat'

Can anyone give me some idea how I can proceed further ?

Thanks in advance.

Upvotes: 2

Views: 6072

Answers (3)

Harichandan Pulagam
Harichandan Pulagam

Reputation: 368

This answer is for writing data to Cassandra, not DSE (which supports Structured Streaming for storing data)

For Spark 2.4.0 and higher, you can use the foreachBatch method, which allows you to use the Cassandra batch data writer provided by the Spark Cassandra Connector to write the output of every micro-batch of the streaming query to Cassandra:

import org.apache.spark.sql.cassandra._

df.writeStream
  .foreachBatch { (batchDF, _) => 
    batchDF
     .write
     .cassandraFormat("tableName", "keyspace")
     .mode("append")
     .save
  }.start

For Spark versions lower than 2.4.0, you need to implement a foreach sink.

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.Statement
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

class CassandraSink(sparkConf: SparkConf) extends ForeachWriter[Row] {
    def open(partitionId: Long, version: Long): Boolean = true

    def process(row: Row) = {
      def buildStatement: Statement =
        QueryBuilder.insertInto("keyspace", "tableName")
          .value("key", row.getAs[String]("value"))
      CassandraConnector(sparkConf).withSessionDo { session =>
        session.execute(buildStatement)
      }
    }

    def close(errorOrNull: Throwable) = Unit
}

And then you can use the foreach sink as follows:

df.writeStream
 .foreach(new CassandraSink(spark.sparkContext.getConf))
 .start

Upvotes: 2

Atanu chatterjee
Atanu chatterjee

Reputation: 477

After upgrading DSE 6.0 (latest version) I am able to write structured streaming data into Cassandra. [Spark 2.2 & Cassandra 3.11]

Reference Code:

query = fileStreamDf.writeStream\
 .option("checkpointLocation", '/tmp/check_point/')\
 .format("org.apache.spark.sql.cassandra")\
 .option("keyspace", "analytics")\
 .option("table", "test")\
 .start()

DSE documentation URL: https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/structuredStreaming.html

Upvotes: 6

user9702965
user9702965

Reputation:

Not much you can do here other than:

  • Following (and voting for) corresponding JIRA.
  • Implementing required functionality and opening PR yourself.

Other than that, you can just create use foreach sink and write directly.

Upvotes: 1

Related Questions