stackq
stackq

Reputation: 491

Combining data frames into one

This is a followup to a question I asked yesterday.

I have table1 which tracks implementation status of product (feature column). Unfortunately, this table does not contain rows where a feature has not been implemented and I would like to add this. There is also table2 which contains every id -- table2.id is the join key with table1.xml_id.

In short, I want to:

This way, I will have a row for every table2.id and table1.feature, each with the correct status.

A full list of features can be found in table3 or disitinct(table2.feature). I had originally tried to solve this with raw HQL like so:

select
    distinct(f.feature),
    p.id as xml_id,
    "no_contact" as status
from
    table3 f
cross join
    table2 p
where 
    id in(
        select
            p.id
        from
             table2 p
        left join(
            select
                xml_id
            from
                table1
            where
                yyyy_mm_dd = '2020-05-30'
        ) s on s.xml_id = p.id
        where
            s.xml_id is null
    )
group by
    1,2,3
union all
    select
        feature,
        xml_id,
        status
    from
        table1
     where
        yyyy_mm_dd = '2020-05-30'

The above almost works. The problem is that the no_contact count is the same for every feature rather than a correct count per feature. I figured, it might be easier to solve in Python using Pandas. So I brought the data in via PySpark.

Below is the data for those from table1 which have a status already:

has_status = spark.sql("""
    select
        yyyy_mm_dd,
        xml_id,
        feature,
        status
    from
        table1
    where
        yyyy_mm_dd = '2020-05-30'
""")
has_status = has_status.toPandas()
has_status

And here is the data for the all table2.ids:

all_ids = spark.sql("""
    select
        p.id
    from
         table2
""")
all_ids = no_status.toPandas()
all_ids

Would it be possible to achieve this goal in Python? So if a table2.id doesn't have a row per feature in has_status then I would like to add the id to has_status and assign a status of no_contact. Additionally, it would be great if I could add a no_contact for those already in table1 but missing a status for a particular feature i.e. no_contact. Example data / schema:

DROP TABLE IF EXISTS table1;
CREATE TABLE table1 (
  `yyyy_mm_dd` DATE,
  `xml_id` INTEGER,
  `feature` VARCHAR(31),
  `status` VARCHAR(31)
);


INSERT INTO table1
  (yyyy_mm_dd, xml_id, feature, status)
VALUES
  ('2020-07-10', '2', 'basic', 'implemented'),
  ('2020-07-10', '2', 'geo', 'implemented'),
  ('2020-07-10', '2', 'mobile', 'first_contact'),
  ('2020-07-10', '1', 'geo', 'first_contact'),
  ('2020-07-10', '1', 'mobile', 'implemented'),
  ('2020-07-10', '3', 'basic', 'first_contact'),
  ('2020-07-10', '3', 'geo', 'implemented')
;

DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (
  `id` INTEGER,
  `name` VARCHAR(3),
  `active` INTEGER
);

INSERT INTO table2
  (`id`, `name`, `active`)
VALUES
  ('1', 'xyz', '1'),
  ('2', 'dfg', '1'),
  ('3', 'lki', '1'),
  ('4', 'nbg', '0'),
  ('5', 'qyt', '0'),
  ('6', 'bfh', '1');
 
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (
  `feature` VARCHAR(20),
  `metric` VARCHAR(20),
  `app` VARCHAR(20)
);

INSERT INTO table3
  (`feature`, `metric`, `app`)
VALUES
  ('basic', 'basic_read', 'promotions'),
  ('basic', 'basic_update', 'promotions'),
  ('basic', 'basic_write', 'promotions'),
  ('geo', 'geo_update', 'admin'),
  ('geo', 'geo_write', 'admin'),
  ('mobile', 'mobile_executed', 'admin');

Based on the above sample data, expected output would be a df which looks similar to this:

Expected output would look like this:

| yyyy_mm_dd | xml_id | feature | status        |
|------------|--------|---------|---------------|
| 2020-07-10 | 2      | basic   | implemented   |
| 2020-07-10 | 2      | geo     | implemented   |
| 2020-07-10 | 2      | mobile  | first_contact |
| 2020-07-10 | 1      | geo     | first_contact |
| 2020-07-10 | 1      | mobile  | implemented   |
| 2020-07-10 | 3      | basic   | first_contact |
| 2020-07-10 | 3      | geo     | implemented   |
| 2020-07-10 | 4      | mobile  | no_contact    |
| 2020-07-10 | 4      | geo     | no_contact    |
| 2020-07-10 | 4      | basic   | no_contact    |
| 2020-07-10 | 5      | mobile  | no_contact    |
| 2020-07-10 | 5      | geo     | no_contact    |
| 2020-07-10 | 5      | basic   | no_contact    |
| 2020-07-10 | 1      | basic   | no_contact    |
| 2020-07-10 | 3      | mobile  | no_contact    |

Upvotes: 1

Views: 60

Answers (1)

Lamanus
Lamanus

Reputation: 13581

Here is the way using pyspark.

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy()

df2.selectExpr("id as xml_id") \
   .crossJoin(df3.select('feature').distinct()) \
   .join(df1, ['xml_id', 'feature'], 'left') \
   .withColumn('yyyy_mm_dd', f.max('yyyy_mm_dd').over(w)) \
   .withColumn('status', f.expr("coalesce(status, 'no_contract')")) \
   .orderBy('xml_id', 'feature') \
   .show(20, False)

+------+-------+----------+-------------+
|xml_id|feature|yyyy_mm_dd|status       |
+------+-------+----------+-------------+
|1     |basic  |2020-07-10|no_contract  |
|1     |geo    |2020-07-10|first_contact|
|1     |mobile |2020-07-10|implemented  |
|2     |basic  |2020-07-10|implemented  |
|2     |geo    |2020-07-10|implemented  |
|2     |mobile |2020-07-10|first_contact|
|3     |basic  |2020-07-10|first_contact|
|3     |geo    |2020-07-10|implemented  |
|3     |mobile |2020-07-10|no_contract  |
|4     |basic  |2020-07-10|no_contract  |
|4     |geo    |2020-07-10|no_contract  |
|4     |mobile |2020-07-10|no_contract  |
|5     |basic  |2020-07-10|no_contract  |
|5     |geo    |2020-07-10|no_contract  |
|5     |mobile |2020-07-10|no_contract  |
|6     |basic  |2020-07-10|no_contract  |
|6     |geo    |2020-07-10|no_contract  |
|6     |mobile |2020-07-10|no_contract  |
+------+-------+----------+-------------+

The dataframes are

df1 = spark.createDataFrame([  ('2020-07-10', '2', 'basic', 'implemented'),
  ('2020-07-10', '2', 'geo', 'implemented'),
  ('2020-07-10', '2', 'mobile', 'first_contact'),
  ('2020-07-10', '1', 'geo', 'first_contact'),
  ('2020-07-10', '1', 'mobile', 'implemented'),
  ('2020-07-10', '3', 'basic', 'first_contact'),
  ('2020-07-10', '3', 'geo', 'implemented')], ['yyyy_mm_dd', 'xml_id', 'feature', 'status'])
df1.show(10, False)

+----------+------+-------+-------------+
|yyyy_mm_dd|xml_id|feature|status       |
+----------+------+-------+-------------+
|2020-07-10|2     |basic  |implemented  |
|2020-07-10|2     |geo    |implemented  |
|2020-07-10|2     |mobile |first_contact|
|2020-07-10|1     |geo    |first_contact|
|2020-07-10|1     |mobile |implemented  |
|2020-07-10|3     |basic  |first_contact|
|2020-07-10|3     |geo    |implemented  |
+----------+------+-------+-------------+

df2 = spark.createDataFrame([  ('1', 'xyz', '1'),
  ('2', 'dfg', '1'),
  ('3', 'lki', '1'),
  ('4', 'nbg', '0'),
  ('5', 'qyt', '0'),
  ('6', 'bfh', '1')], ['id', 'name', 'active'])
df2.show(10, False)

+---+----+------+
|id |name|active|
+---+----+------+
|1  |xyz |1     |
|2  |dfg |1     |
|3  |lki |1     |
|4  |nbg |0     |
|5  |qyt |0     |
|6  |bfh |1     |
+---+----+------+

df3 = spark.createDataFrame([  ('basic', 'basic_read', 'promotions'),
  ('basic', 'basic_update', 'promotions'),
  ('basic', 'basic_write', 'promotions'),
  ('geo', 'geo_update', 'admin'),
  ('geo', 'geo_write', 'admin'),
  ('mobile', 'mobile_executed', 'admin')], ['feature', 'metric', 'app'])
df3.show(10, False)

+-------+---------------+----------+
|feature|metric         |app       |
+-------+---------------+----------+
|basic  |basic_read     |promotions|
|basic  |basic_update   |promotions|
|basic  |basic_write    |promotions|
|geo    |geo_update     |admin     |
|geo    |geo_write      |admin     |
|mobile |mobile_executed|admin     |
+-------+---------------+----------+

Upvotes: 1

Related Questions