Anton Zubochenko
Anton Zubochenko

Reputation: 163

Insert data from pyspark dataframe to another cassandra table using pyspark

I have a cassandra table - test:

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  B |      EN |       2 |
+----+---------+---------+
|  C |      IQ |       1 |
+----+---------+---------+
|  D |      RU |       3 |
+----+---------+---------+

Also I have a table main in the same space with column "country_main" and "main_id". In column main_id I have same ids as in test table, and also I have some unique ids. country_main has empty values and the same as in test. For ex:

+---------+--------------+---------+
| main_id | country_main |      ...|
+=========+==============+=========+
|  A      |              |      ...|
+---------+--------------+---------+
|  B      |      EN      |      ...|
+---------+--------------+---------+
|  Y      |      IQ      |      ...|
+---------+--------------+---------+
|  Z      |      RU      |      ...|
+---------+--------------+---------+

How to insert data from test table to main using pyspark to fill empty values in country_main according to ids?

Upvotes: 2

Views: 1014

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

Having following schema & data:

create table test.ct1 (
  id text primary key,
  country text,
  cnt int);

insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);


create table test.ct2 (
  main_id text primary key,
  country_main text,
  cnt int);

insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);

It should be something like this:

from pyspark.sql.functions import *

ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
   .option("table", "ct1").option("keyspace", "test").load()

ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
  .option("table", "ct2").option("keyspace", "test").load()\
  .where(col("country_main").isNull())

res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"), 
  col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
   .option("table", "ct2").option("keyspace", "test")\
   .mode("append").save()

What code does:

  1. selects all rows from ct2 (corresponds to your main table) where country_main is null;
  2. performs join with ct1 (corresponds to your test table) to get value of country from it (optimization could be to select only necessary columns from both tables). Also, please note that join is done by Spark, not on Cassandra level - Cassandra-level joins will be supported only in upcoming version of Spark Cassandra Connector (3.0, but alpha versions already published);
  3. renames columns to match structure of ct2 table;
  4. write data back.

Result:

cqlsh> select * from test.ct2;

 main_id | cnt | country_main
---------+-----+--------------
       C |   1 |           IQ
       B |   2 |           EN
       A |   1 |           RU
       D |   3 |           RU

for source data:

cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------                                       
       C |   1 |           IQ                                  
       B |   2 |           EN                                                                                         
       A |   1 |         null                                      
       D |   3 |           RU

Upvotes: 2

Related Questions