Reputation: 491
This is a followup to a question I asked yesterday.
I have table1
which tracks implementation status of product (feature
column). Unfortunately, this table does not contain rows where a feature
has not been implemented and I would like to add this. There is also table2
which contains every id
-- table2.id
is the join key with table1.xml_id
.
In short, I want to:
table2.id
s not in table1
, assign them a no_contact
status for each feature
that they don't already have a status in table1
.table1.xml_ids
already present in table1
but without a status for a particular feature
assign them a no_contact
status for that feature
.This way, I will have a row for every table2.id
and table1.feature
, each with the correct status
.
A full list of features
can be found in table3
or disitinct(table2.feature)
. I had originally tried to solve this with raw HQL
like so:
select
distinct(f.feature),
p.id as xml_id,
"no_contact" as status
from
table3 f
cross join
table2 p
where
id in(
select
p.id
from
table2 p
left join(
select
xml_id
from
table1
where
yyyy_mm_dd = '2020-05-30'
) s on s.xml_id = p.id
where
s.xml_id is null
)
group by
1,2,3
union all
select
feature,
xml_id,
status
from
table1
where
yyyy_mm_dd = '2020-05-30'
The above almost works. The problem is that the no_contact
count is the same for every feature
rather than a correct count per feature
. I figured, it might be easier to solve in Python using Pandas. So I brought the data in via PySpark.
Below is the data for those from table1
which have a status already:
has_status = spark.sql("""
select
yyyy_mm_dd,
xml_id,
feature,
status
from
table1
where
yyyy_mm_dd = '2020-05-30'
""")
has_status = has_status.toPandas()
has_status
And here is the data for the all table2.id
s:
all_ids = spark.sql("""
select
p.id
from
table2
""")
all_ids = no_status.toPandas()
all_ids
Would it be possible to achieve this goal in Python? So if a table2.id
doesn't have a row per feature
in has_status
then I would like to add the id
to has_status
and assign a status
of no_contact. Additionally, it would be great if I could add a no_contact
for those already in table1
but missing a status for a particular feature i.e. no_contact.
Example data / schema:
DROP TABLE IF EXISTS table1;
CREATE TABLE table1 (
`yyyy_mm_dd` DATE,
`xml_id` INTEGER,
`feature` VARCHAR(31),
`status` VARCHAR(31)
);
INSERT INTO table1
(yyyy_mm_dd, xml_id, feature, status)
VALUES
('2020-07-10', '2', 'basic', 'implemented'),
('2020-07-10', '2', 'geo', 'implemented'),
('2020-07-10', '2', 'mobile', 'first_contact'),
('2020-07-10', '1', 'geo', 'first_contact'),
('2020-07-10', '1', 'mobile', 'implemented'),
('2020-07-10', '3', 'basic', 'first_contact'),
('2020-07-10', '3', 'geo', 'implemented')
;
DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (
`id` INTEGER,
`name` VARCHAR(3),
`active` INTEGER
);
INSERT INTO table2
(`id`, `name`, `active`)
VALUES
('1', 'xyz', '1'),
('2', 'dfg', '1'),
('3', 'lki', '1'),
('4', 'nbg', '0'),
('5', 'qyt', '0'),
('6', 'bfh', '1');
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (
`feature` VARCHAR(20),
`metric` VARCHAR(20),
`app` VARCHAR(20)
);
INSERT INTO table3
(`feature`, `metric`, `app`)
VALUES
('basic', 'basic_read', 'promotions'),
('basic', 'basic_update', 'promotions'),
('basic', 'basic_write', 'promotions'),
('geo', 'geo_update', 'admin'),
('geo', 'geo_write', 'admin'),
('mobile', 'mobile_executed', 'admin');
Based on the above sample data, expected output would be a df
which looks similar to this:
Expected output would look like this:
| yyyy_mm_dd | xml_id | feature | status |
|------------|--------|---------|---------------|
| 2020-07-10 | 2 | basic | implemented |
| 2020-07-10 | 2 | geo | implemented |
| 2020-07-10 | 2 | mobile | first_contact |
| 2020-07-10 | 1 | geo | first_contact |
| 2020-07-10 | 1 | mobile | implemented |
| 2020-07-10 | 3 | basic | first_contact |
| 2020-07-10 | 3 | geo | implemented |
| 2020-07-10 | 4 | mobile | no_contact |
| 2020-07-10 | 4 | geo | no_contact |
| 2020-07-10 | 4 | basic | no_contact |
| 2020-07-10 | 5 | mobile | no_contact |
| 2020-07-10 | 5 | geo | no_contact |
| 2020-07-10 | 5 | basic | no_contact |
| 2020-07-10 | 1 | basic | no_contact |
| 2020-07-10 | 3 | mobile | no_contact |
Upvotes: 1
Views: 60
Reputation: 13581
Here is the way using pyspark.
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy()
df2.selectExpr("id as xml_id") \
.crossJoin(df3.select('feature').distinct()) \
.join(df1, ['xml_id', 'feature'], 'left') \
.withColumn('yyyy_mm_dd', f.max('yyyy_mm_dd').over(w)) \
.withColumn('status', f.expr("coalesce(status, 'no_contract')")) \
.orderBy('xml_id', 'feature') \
.show(20, False)
+------+-------+----------+-------------+
|xml_id|feature|yyyy_mm_dd|status |
+------+-------+----------+-------------+
|1 |basic |2020-07-10|no_contract |
|1 |geo |2020-07-10|first_contact|
|1 |mobile |2020-07-10|implemented |
|2 |basic |2020-07-10|implemented |
|2 |geo |2020-07-10|implemented |
|2 |mobile |2020-07-10|first_contact|
|3 |basic |2020-07-10|first_contact|
|3 |geo |2020-07-10|implemented |
|3 |mobile |2020-07-10|no_contract |
|4 |basic |2020-07-10|no_contract |
|4 |geo |2020-07-10|no_contract |
|4 |mobile |2020-07-10|no_contract |
|5 |basic |2020-07-10|no_contract |
|5 |geo |2020-07-10|no_contract |
|5 |mobile |2020-07-10|no_contract |
|6 |basic |2020-07-10|no_contract |
|6 |geo |2020-07-10|no_contract |
|6 |mobile |2020-07-10|no_contract |
+------+-------+----------+-------------+
The dataframes are
df1 = spark.createDataFrame([ ('2020-07-10', '2', 'basic', 'implemented'),
('2020-07-10', '2', 'geo', 'implemented'),
('2020-07-10', '2', 'mobile', 'first_contact'),
('2020-07-10', '1', 'geo', 'first_contact'),
('2020-07-10', '1', 'mobile', 'implemented'),
('2020-07-10', '3', 'basic', 'first_contact'),
('2020-07-10', '3', 'geo', 'implemented')], ['yyyy_mm_dd', 'xml_id', 'feature', 'status'])
df1.show(10, False)
+----------+------+-------+-------------+
|yyyy_mm_dd|xml_id|feature|status |
+----------+------+-------+-------------+
|2020-07-10|2 |basic |implemented |
|2020-07-10|2 |geo |implemented |
|2020-07-10|2 |mobile |first_contact|
|2020-07-10|1 |geo |first_contact|
|2020-07-10|1 |mobile |implemented |
|2020-07-10|3 |basic |first_contact|
|2020-07-10|3 |geo |implemented |
+----------+------+-------+-------------+
df2 = spark.createDataFrame([ ('1', 'xyz', '1'),
('2', 'dfg', '1'),
('3', 'lki', '1'),
('4', 'nbg', '0'),
('5', 'qyt', '0'),
('6', 'bfh', '1')], ['id', 'name', 'active'])
df2.show(10, False)
+---+----+------+
|id |name|active|
+---+----+------+
|1 |xyz |1 |
|2 |dfg |1 |
|3 |lki |1 |
|4 |nbg |0 |
|5 |qyt |0 |
|6 |bfh |1 |
+---+----+------+
df3 = spark.createDataFrame([ ('basic', 'basic_read', 'promotions'),
('basic', 'basic_update', 'promotions'),
('basic', 'basic_write', 'promotions'),
('geo', 'geo_update', 'admin'),
('geo', 'geo_write', 'admin'),
('mobile', 'mobile_executed', 'admin')], ['feature', 'metric', 'app'])
df3.show(10, False)
+-------+---------------+----------+
|feature|metric |app |
+-------+---------------+----------+
|basic |basic_read |promotions|
|basic |basic_update |promotions|
|basic |basic_write |promotions|
|geo |geo_update |admin |
|geo |geo_write |admin |
|mobile |mobile_executed|admin |
+-------+---------------+----------+
Upvotes: 1