Reputation: 11
I know that using a for loop in Spark is not a good practice as it runs sequentially, not in parallel. However, as I'm new to working with Spark, I haven't found a better solution yet.
I have 2 df, df_fact which contains information about the processing dates of files:
+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|unit_code |code |final_status |receive_date |proceed_date |expected_date |m |y |
+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|PER.15.82.13|1.008304|1 |2022-12-04 09:00:00|2022-12-05 09:00:00 |2022-12-05 09:00:00|12 |2022|
|PER.66.42.45|1.088304|NULL |2022-05-03 10:00:01|2099-12-31 00:00:00 |2022-10-05 00:00:00|12 |2099|
|PER.84.04.66|1.008305|5 |2022-01-26 11:06:00|2022-01-31 11:06:00 |2022-01-30 11:06:00|1 |2022|
|PER.30.23.22|1.008404|9 |2022-03-03 17:05:32|2022-05-03 10:55:48 |2022-05-02 17:05:32|5 |2022|
|PER.38.38.87|1.018304|10 |2022-02-04 10:02:16|2022-02-24 10:02:16 |2022-03-08 10:02:16|2 |2022|
|PER.65.76.59|1.008394|7 |2022-09-04 16:07:21|2022-11-05 16:07:21 |2022-10-10 16:07:21|11 |2022|
...
and df_period, where each row represents a period:
+--------------+-------------------+-------------------+---+----+
|period_code |apply_from |apply_to |M |Y |
+--------------+-------------------+-------------------+---+----+
|202201M |2022-01-01 00:00:00|2022-01-31 23:59:59|1 |2022|
|202202M |2022-02-01 00:00:00|2022-02-28 23:59:59|2 |2022|
|202203M |2022-03-01 00:00:00|2022-03-31 23:59:59|3 |2022|
|202204M |2022-04-01 00:00:00|2022-04-30 23:59:59|4 |2022|
...
The problem is that I'm trying to obtain information on how many files are processing on time, how many are completed on time, how many are overdue, etc., within a given period. Due to the nature of the business logic, a file can belong to multiple periods, which means it could be considered on time in one period but overdue in another. Therefore, I believe that joining the two tables may not be a feasible solution.
For example, this could occur in different periods such as '202203M' and '202204M', and would be classified as on time and overdue, respectively:
+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|unit_code |code |final_status |receive_date |proceed_date |expected_date |m |y |
+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|PER.30.23.22|1.008404|9 |2022-03-03 17:05:32|2022-05-03 10:55:48 |2022-05-02 17:05:32|5 |2022|
Here is my approach: for each row of the df_period, it would then be used as a condition to filter the df_fact. The new filtered df would then be used for calculations to obtain a result df. This process would be repeated for the next row of the df_period, and the results would be unioned with the previous result df.
It would be easier to understand by examining the code:
Dataset<Row> result = null;
for (Row row : df_ky.collectAsList()) {
Timestamp N = row.getAs("apply_to");
int M = row.getAs("M");
int Y = row.getAs("Y");
Dataset<Row> df_month = df.filter(col("m").equalTo(M).and(col("y").equalTo(Y))
.or(col("receive_date").leq(lit(N)).and(lit(N).lt(col("proceed_date")))));
df_month = df_month.withColumn("processing_ontime",
when(col("receive_date").leq(lit(N)).and(lit(N).leq(col("expected_date"))).and(lit(N).lt(col("proceed_date"))), 1).otherwise(0));
df_month = df_month.withColumn("processing_overdue",
when(lit(N).gt(col("expected_date")).and(lit(N).lt(col("proceed_date"))), 1).otherwise(0));
df_month = df_month.withColumn("done_ontime",
when(col("proceed_date").leq(col("expected_date")).and(col("m").equalTo(M)).and(col("y").equalTo(Y)), 1).otherwise(0));
df_month = df_month.withColumn("done_overdue",
when(col("proceed_date").gt(col("expected_date")).and(col("m").equalTo(M)).and(col("y").equalTo(Y)), 1).otherwise(0));
df_month = df_month.withColumn("tonghoso",
when(col("receive_date").leq(lit(N)).and(lit(N).lt(col("proceed_date")))
.or(col("m").equalTo(M).and(col("y").equalTo(Y))), 1).otherwise(0));
df_month = df_month.withColumn("tonghoso_ddk",
when(col("final_status").isin(9, 10)
.and(col("receive_date").leq(lit(N))).and(lit(N).leq(col("proceed_date"))), 1).otherwise(0));
df_month = df_month.withColumn("tg_xlhs",
when(col("tonghoso_ddk").equalTo(1), unix_timestamp(col("proceed_date")).cast("long").minus(unix_timestamp(col("receive_date")).cast("long"))).otherwise(null));
Dataset<Row> result1 = df_month.groupBy("unit_code", "code")
.agg(
sum("processing_ontime").alias("tshs_processing_ontime"),
sum("processing_overdue").alias("tshs_processing_overdue"),
sum("done_ontime").alias("tshs_done_ontime"),
sum("done_overdue").alias("tshs_done_overdue"),
sum("tonghoso").alias("tshs"),
sum("tonghoso_ddk").alias("tshs_ddk"),
sum("tg_xlhs").alias("tg_xlhs")
);
result1 = result1.withColumn("period_code", lit(row.getAs("period_code")));
if (result == null) {
result = result1;
} else {
result = result.union(result1);
}
}
what would be a better approach than this ?
Upvotes: 0
Views: 84