user23202697
user23202697

Reputation: 1

Last SPARK Task taking forever to complete

I am running a SPARK job and for the most it goes fast but at the last task, it gets stuck in one of the stages. I can see there is a lot more shuffle read/rows happening for that task and tried a bunch of re-partitioning strategies to ensure an even distribution. But still can't get through it. Could you please help? Attaching images for the same too.

The join that I am doing is trying to look for some private data which is in a delta lake table (all of this is being done on Databricks).

Table 1 with all desired event logs/rows is: sizeInBytes=218.2 TiB; BUT, filtered on a partition key date for just the last 4 days. Still huge enough I assume, as there are a lot of events.

Table 2 The look-up table for the personal fields which are hashed in the above table is: sizeInBytes=1793.9 GiB. This table just has 4 columns. Key, hash, timestamp and type. This is just a simple look-up table.

enter image description here

enter image description here

Essentially, there are 4 hashed-out fields that I need to reverse lookup and that need 4 separate joins with this look-up table. This is quite expensive, but at this point, there is no way out of this. The join is happening on that hashed_key, which I tried to use in the repartitioning scheme for the Dataframes. I thought doing this would bring the same hash_keys in the same partition and then they could be picked up by the same executor. This is the hypothesis, but still, I see one task running for a long time as it is doing an exorbitant amount of shuffle reads and going through a lot more rows.

What could I be doing wrong? Is repartitioning not a good approach here? I read somewhere that I could try ITERATIVE broadcasting. That involved breaking the smaller table (which seems the lookup table here) into smaller chunks (I think less than 8 GB) and then broadcasting it multiple times to eventually merge all data later.

Any help would be appreciated as I am getting stuck in the same place with these strategies.

Thank you!

Doing a union on a few types to create the first dataframe. Then join it with the lookup.

    allIncrementalEvents.as("e")
      .filter(col("e.type") === "authentication")
      .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
      .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
      .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
      .repartition(partitions)

UNION

   allIncrementalEvents.as("e")
      .filter(col("e.type") === "session")
      .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
      .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
      .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
      .repartition(partitions)   

UNION

      allIncrementalEvents.as("e")
      .filter(col("e.type") === "other")
      .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
      .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
      .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
      .repartition(partitions)

Join

 
 extractAuthEvents
        .union(extractSubEvents)
        .union(extractOpenEvents)
        .union(extractSessionEvents)
         .join(reverseLookupTableDf.as("adId"),
            col("adId") === col("adId.hashed"),
            "leftouter"
          )
          .join(reverseLookupTableDf.as("ip"),
            col("ae.ip") === col("ip.hashed"),
            "leftouter"
          )
          .join(reverseLookupTableDf.as("ua"),
            col("ae.ua") === col("ua.hashed"),
            "leftouter"
          )
          .join(reverseLookupTableDf.as("uid"),
            col("ae.uuid") === col("uid.hashed"),
            "leftouter"
          )

Upvotes: 0

Views: 256

Answers (0)

Related Questions