shantha ramadurga
shantha ramadurga

Reputation: 85

How to Use spark cassandra connector API in scala

My previous post: Reparing Prepared stmt warning.

i was not able to solve it, with few suggestions, i tried using spark cassandra connector to solve my problem. But i am completely confused about its usage in my application. i tried to write code as below,but not sure how exactly to use the API's.

val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", "1.1.1.1")
        .set("spark.cassandra.auth.username", "auser")            
        .set("spark.cassandra.auth.password", "apass")
        .set("spark.cassandra.connection.port","9042")

      val sc=new SparkContext(conf)  

      val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.prepareStatement(session,insertQuery)

    val boundStatement = new BoundStatement(insertStatement)

    batch.add(boundStatement.bind(data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))

)

   def prepareStatement(session: Session, query: String): PreparedStatement = {
    val cluster = session.clustername
    get(cluster, query.toString) match {
      case Some(stmt) => stmt
      case None =>
        synchronized {
          get(cluster, query.toString) match {
            case Some(stmt) => stmt
            case None =>
              val stmt = session.prepare(query)
              put(cluster, query.toString, stmt)
          }
        }
    }
  }


  -----------------------------------------------------------------------------------------OR

   val table1 = spark.read
                 .format("org.apache.spark.sql.cassandra")
                 .option( "spark.cassandra.auth.username","apoch_user")
                 .option("spark.cassandra.auth.password","Apoch#123")
                 .options(Map(
                      "table" -> "trip_summary_data",
                       "keyspace" -> "aphoc" ,
                      "cluster" -> "Cluster1"
                       ) ).load()


     def insert( data: TripHistoryData) {

    table1.createOrReplaceTempView("inputTable1");

val df1= spark.sql("select * from inputTable1 where service_id = ? and asset_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?");
val df2=spark.sql("insert into inputTable1 values (data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))


  }

Upvotes: 0

Views: 366

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

You need to concentrate on how you process your data in Spark application, not how the data are read or written (it matters, of course, but only when you hit performance problems).

If you're using Spark, then you need to think in the Spark terms as you're processing data in RDDs or DataFrames. In this case you need to use constructs like these (with DataFrames):

val df = spark
  .read
  .cassandraFormat("words", "test")
  .load()
val newDf = df.sql(...) // some operation on source data
newDF.write
  .cassandraFormat("words_copy", "test")
  .save()

And avoid the use of direct session.prepare/session.execute, cluster.connect, etc. - Spark connector will do prepare, and other optimizations under the hood.

Upvotes: 1

Related Questions