Reputation: 163
I have a cassandra table - test:
+----+---------+---------+
| id | country | counter |
+====+=========+=========+
| A | RU | 1 |
+----+---------+---------+
| B | EN | 2 |
+----+---------+---------+
| C | IQ | 1 |
+----+---------+---------+
| D | RU | 3 |
+----+---------+---------+
Also I have a table main in the same space with column "country_main" and "main_id". In column main_id I have same ids as in test table, and also I have some unique ids. country_main has empty values and the same as in test. For ex:
+---------+--------------+---------+
| main_id | country_main | ...|
+=========+==============+=========+
| A | | ...|
+---------+--------------+---------+
| B | EN | ...|
+---------+--------------+---------+
| Y | IQ | ...|
+---------+--------------+---------+
| Z | RU | ...|
+---------+--------------+---------+
How to insert data from test table to main using pyspark to fill empty values in country_main according to ids?
Upvotes: 2
Views: 1014
Reputation: 87154
Having following schema & data:
create table test.ct1 (
id text primary key,
country text,
cnt int);
insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);
create table test.ct2 (
main_id text primary key,
country_main text,
cnt int);
insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);
It should be something like this:
from pyspark.sql.functions import *
ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct1").option("keyspace", "test").load()
ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test").load()\
.where(col("country_main").isNull())
res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"),
col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test")\
.mode("append").save()
What code does:
ct2
(corresponds to your main
table) where country_main
is null
;ct1
(corresponds to your test
table) to get value of country from it (optimization could be to select only necessary columns from both tables). Also, please note that join is done by Spark, not on Cassandra level - Cassandra-level joins will be supported only in upcoming version of Spark Cassandra Connector (3.0, but alpha versions already published);ct2
table;Result:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | RU
D | 3 | RU
for source data:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | null
D | 3 | RU
Upvotes: 2