rafvasq
rafvasq

Reputation: 1522

Reindexing and filling missing dates in PySpark

Is there a way to fill in the missing column dates and row values in PySpark? Currently, I convert the dataframe to Pandas and reindex there.

sdf.show()

+---+----------+----------+----------+
| id|2018-01-01|2018-01-03|2018-01-05|
+---+----------+----------+----------+
| 1 |       0.0|       1.0|       0.0|
| 2 |       4.0|       2.0|       0.0|
| 3 |       0.0|       1.0|       1.0|
| 7 |       0.0|       2.0|       9.0|
| 8 |       8.0|       0.0|       0.0|
| 9 |       0.0|       0.0|       3.0|
+---+----------+----------+----------+

idx = pd.date_range('01-01-2018', '01-07-2018').date    
df = sdf.toPandas()
df = df.set_index('id')
df = df.reindex(idx, axis=1, fill_value=0)

I haven't been able to find something similar in PySpark.

Desired output:

+---+----------+----------+----------+----------+----------+
| id|2018-01-01|2018-01-02|2018-01-03|2018-01-04|2018-01-05|
+---+----------+----------+----------+----------+----------+
| 1 |       0.0|       0.0|       1.0|       0.0|       0.0|
| 2 |       4.0|       0.0|       2.0|       0.0|       0.0|
| 3 |       0.0|       0.0|       1.0|       0.0|       1.0|
| 7 |       0.0|       0.0|       2.0|       0.0|       9.0|
| 8 |       8.0|       0.0|       0.0|       0.0|       0.0|
| 9 |       0.0|       0.0|       0.0|       0.0|       3.0|
+---+----------+----------+----------+----------+----------+

Upvotes: 1

Views: 1411

Answers (2)

anky
anky

Reputation: 75080

You can use lit() to the values in idx which re not already present in the dataframe.

Note I have converted the column as string just for testing:

ids = [str(i) for i in idx] #may not be required
to_add = [col for col in ids if col not in df.columns]
out = df.select(df.columns+ [lit(0).alias(name) for name in to_add])
out.show()

+---+----------+----------+----------+----------+----------+----------+----------+
| id|2018-01-01|2018-01-03|2018-01-05|2018-01-02|2018-01-04|2018-01-06|2018-01-07|
+---+----------+----------+----------+----------+----------+----------+----------+
|  1|       0.0|       1.0|       0.0|         0|         0|         0|         0|
|  2|       4.0|       2.0|       0.0|         0|         0|         0|         0|
|  3|       0.0|       1.0|       1.0|         0|         0|         0|         0|
|  7|       0.0|       2.0|       9.0|         0|         0|         0|         0|
|  8|       8.0|       0.0|       0.0|         0|         0|         0|         0|
|  9|       0.0|       0.0|       3.0|         0|         0|         0|         0|
+---+----------+----------+----------+----------+----------+----------+----------+

Upvotes: 1

aamirmalik124
aamirmalik124

Reputation: 125

Try this

rdd_df = df.rdd.zipWithIndex()
df_final = rdd_df.toDF(sampleRatio=0.2)
df_final = df_final.withColumn('name_id', df_final['_1'].getItem("column name"))

Upvotes: 0

Related Questions