vikash kumar
vikash kumar

Reputation: 99

update column of set type in Cassandra row

I have one RDD [PersonType] = [pid,cid,firstname, lastname, age, source, sourceType, message] value as RDD = [1000,100,Vikash,Singh,33,source,sourceType,message]

and I have csaandra row as [pid,cid,firstname,lastname,age,dept,mrids] here mrids is set . suppose value in cassandra is [1000,100,vikash,singh,33,bank,{sourceold.sourceTypeold.messageold}

I want to update the cassandra column mrids with both old and new value. so My new updated value in cassandra should be [1000,100,vikash,singh,33,bank,{sourceold.sourceTypeold.messageold, source.sourceType.message}

Please tell me how to update mrids column.

val rdd[personType] = rdd1
val rdd2 = sc.cassandraTable(keyspace,tablename)
              .select("p_id","c_id", "mrids")

what code should i write next to achieve this?

Upvotes: 0

Views: 449

Answers (1)

Abhishek Anand
Abhishek Anand

Reputation: 1992

This should get you started.

It shows you how to do join of rdd based on key, and append data into set of another rdd.

val temp = List((1, 4, Set(1)),
                    (2, 5, Set(2)),
                    (3, 6, Set(3))
                    )
val temp2 = List((1, 11, 11),
                  (2, 11, 22),
                  (3, 11, 33)
                )
val temp_rdd = sc.parallelize(temp)

val temp2_rdd = sc.parallelize(temp2)

val test = temp_rdd.map{case(key, data, set)=>((key),(data, set))}
                        .join(temp2_rdd.map{case(key, data, set_new_value)=>((key),(data, set_new_value))})
                        .map{case(key, ((data1, set),(data2, set_new_value)))=>(key, set.toSet + set_new_value)}


test.collect().foreach(println)

at the end you can use rdd.saveToCassandra to save the rdd resultset.

Upvotes: 0

Related Questions