Quang Anh Trần
Quang Anh Trần

Reputation: 11

What would be a better approach than using the for loop in Spark?

I know that using a for loop in Spark is not a good practice as it runs sequentially, not in parallel. However, as I'm new to working with Spark, I haven't found a better solution yet.

I have 2 df, df_fact which contains information about the processing dates of files:

+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|unit_code   |code    |final_status     |receive_date       |proceed_date           |expected_date      |m  |y   |
+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|PER.15.82.13|1.008304|1                |2022-12-04 09:00:00|2022-12-05 09:00:00    |2022-12-05 09:00:00|12 |2022|
|PER.66.42.45|1.088304|NULL             |2022-05-03 10:00:01|2099-12-31 00:00:00    |2022-10-05 00:00:00|12 |2099|
|PER.84.04.66|1.008305|5                |2022-01-26 11:06:00|2022-01-31 11:06:00    |2022-01-30 11:06:00|1  |2022|
|PER.30.23.22|1.008404|9                |2022-03-03 17:05:32|2022-05-03 10:55:48    |2022-05-02 17:05:32|5  |2022|
|PER.38.38.87|1.018304|10               |2022-02-04 10:02:16|2022-02-24 10:02:16    |2022-03-08 10:02:16|2  |2022|
|PER.65.76.59|1.008394|7                |2022-09-04 16:07:21|2022-11-05 16:07:21    |2022-10-10 16:07:21|11 |2022|
...

and df_period, where each row represents a period:

+--------------+-------------------+-------------------+---+----+
|period_code   |apply_from         |apply_to           |M  |Y   |
+--------------+-------------------+-------------------+---+----+
|202201M       |2022-01-01 00:00:00|2022-01-31 23:59:59|1  |2022|
|202202M       |2022-02-01 00:00:00|2022-02-28 23:59:59|2  |2022|
|202203M       |2022-03-01 00:00:00|2022-03-31 23:59:59|3  |2022|
|202204M       |2022-04-01 00:00:00|2022-04-30 23:59:59|4  |2022|
...

The problem is that I'm trying to obtain information on how many files are processing on time, how many are completed on time, how many are overdue, etc., within a given period. Due to the nature of the business logic, a file can belong to multiple periods, which means it could be considered on time in one period but overdue in another. Therefore, I believe that joining the two tables may not be a feasible solution.

For example, this could occur in different periods such as '202203M' and '202204M', and would be classified as on time and overdue, respectively:

+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|unit_code   |code    |final_status     |receive_date       |proceed_date           |expected_date      |m  |y   |
+------------+--------+-----------------+-------------------+-----------------------+-------------------+---+----+
|PER.30.23.22|1.008404|9                |2022-03-03 17:05:32|2022-05-03 10:55:48    |2022-05-02 17:05:32|5  |2022|

Here is my approach: for each row of the df_period, it would then be used as a condition to filter the df_fact. The new filtered df would then be used for calculations to obtain a result df. This process would be repeated for the next row of the df_period, and the results would be unioned with the previous result df.

It would be easier to understand by examining the code:

Dataset<Row> result = null;
for (Row row : df_ky.collectAsList()) {
            Timestamp N = row.getAs("apply_to");
            int M = row.getAs("M");
            int Y = row.getAs("Y");

            Dataset<Row> df_month = df.filter(col("m").equalTo(M).and(col("y").equalTo(Y))
                    .or(col("receive_date").leq(lit(N)).and(lit(N).lt(col("proceed_date")))));

            df_month = df_month.withColumn("processing_ontime",
                    when(col("receive_date").leq(lit(N)).and(lit(N).leq(col("expected_date"))).and(lit(N).lt(col("proceed_date"))), 1).otherwise(0));

            df_month = df_month.withColumn("processing_overdue",
                    when(lit(N).gt(col("expected_date")).and(lit(N).lt(col("proceed_date"))), 1).otherwise(0));

            df_month = df_month.withColumn("done_ontime",
                    when(col("proceed_date").leq(col("expected_date")).and(col("m").equalTo(M)).and(col("y").equalTo(Y)), 1).otherwise(0));

            df_month = df_month.withColumn("done_overdue",
                    when(col("proceed_date").gt(col("expected_date")).and(col("m").equalTo(M)).and(col("y").equalTo(Y)), 1).otherwise(0));

            df_month = df_month.withColumn("tonghoso",
                    when(col("receive_date").leq(lit(N)).and(lit(N).lt(col("proceed_date")))
                            .or(col("m").equalTo(M).and(col("y").equalTo(Y))), 1).otherwise(0));

            df_month = df_month.withColumn("tonghoso_ddk",
                    when(col("final_status").isin(9, 10)
                            .and(col("receive_date").leq(lit(N))).and(lit(N).leq(col("proceed_date"))), 1).otherwise(0));

            df_month = df_month.withColumn("tg_xlhs",
                    when(col("tonghoso_ddk").equalTo(1), unix_timestamp(col("proceed_date")).cast("long").minus(unix_timestamp(col("receive_date")).cast("long"))).otherwise(null));

            Dataset<Row> result1 = df_month.groupBy("unit_code", "code")
                    .agg(
                            sum("processing_ontime").alias("tshs_processing_ontime"),
                            sum("processing_overdue").alias("tshs_processing_overdue"),
                            sum("done_ontime").alias("tshs_done_ontime"),
                            sum("done_overdue").alias("tshs_done_overdue"),
                            sum("tonghoso").alias("tshs"),
                            sum("tonghoso_ddk").alias("tshs_ddk"),
                            sum("tg_xlhs").alias("tg_xlhs")
                    );

            result1 = result1.withColumn("period_code", lit(row.getAs("period_code")));
            if (result == null) {
                result = result1;
            } else {
                result = result.union(result1);
            }
        }

what would be a better approach than this ?

Upvotes: 0

Views: 84

Answers (0)

Related Questions