\n
Essentially, there are 4 hashed-out fields that I need to reverse lookup and that need 4 separate joins with this look-up table. This is quite expensive, but at this point, there is no way out of this. The join is happening on that hashed_key
, which I tried to use in the repartitioning
scheme for the Dataframes. I thought doing this would bring the same hash_keys
in the same partition and then they could be picked up by the same executor. This is the hypothesis, but still, I see one task running for a long time as it is doing an exorbitant amount of shuffle reads
and going through a lot more rows.
What could I be doing wrong? Is repartitioning
not a good approach here? I read somewhere that I could try ITERATIVE
broadcasting. That involved breaking the smaller table (which seems the lookup table here) into smaller chunks (I think less than 8 GB) and then broadcasting it multiple times to eventually merge all data later.
Any help would be appreciated as I am getting stuck in the same place with these strategies.
\nThank you!
\nDoing a union on a few types to create the first dataframe. Then join it with the lookup.
\n allIncrementalEvents.as("e")\n .filter(col("e.type") === "authentication")\n .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))\n .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))\n .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))\n .repartition(partitions)\n\nUNION\n\n allIncrementalEvents.as("e")\n .filter(col("e.type") === "session")\n .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))\n .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))\n .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))\n .repartition(partitions) \n\nUNION\n\n allIncrementalEvents.as("e")\n .filter(col("e.type") === "other")\n .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))\n .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))\n .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))\n .repartition(partitions)\n
\nJoin
\n \n extractAuthEvents\n .union(extractSubEvents)\n .union(extractOpenEvents)\n .union(extractSessionEvents)\n .join(reverseLookupTableDf.as("adId"),\n col("adId") === col("adId.hashed"),\n "leftouter"\n )\n .join(reverseLookupTableDf.as("ip"),\n col("ae.ip") === col("ip.hashed"),\n "leftouter"\n )\n .join(reverseLookupTableDf.as("ua"),\n col("ae.ua") === col("ua.hashed"),\n "leftouter"\n )\n .join(reverseLookupTableDf.as("uid"),\n col("ae.uuid") === col("uid.hashed"),\n "leftouter"\n )\n
\n","author":{"@type":"Person","name":"user23202697"},"upvoteCount":0,"answerCount":0,"acceptedAnswer":null}}Reputation: 1
I am running a SPARK job and for the most it goes fast but at the last task, it gets stuck in one of the stages. I can see there is a lot more shuffle read/rows happening for that task and tried a bunch of re-partitioning strategies to ensure an even distribution. But still can't get through it. Could you please help? Attaching images for the same too.
The join that I am doing is trying to look for some private data which is in a delta lake table (all of this is being done on Databricks).
Table 1 with all desired event logs/rows is: sizeInBytes=218.2 TiB
; BUT, filtered on a partition key date
for just the last 4 days
. Still huge enough I assume, as there are a lot of events.
Table 2 The look-up table for the personal fields which are hashed in the above table is: sizeInBytes=1793.9 GiB
. This table just has 4 columns. Key, hash, timestamp and type. This is just a simple look-up table.
Essentially, there are 4 hashed-out fields that I need to reverse lookup and that need 4 separate joins with this look-up table. This is quite expensive, but at this point, there is no way out of this. The join is happening on that hashed_key
, which I tried to use in the repartitioning
scheme for the Dataframes. I thought doing this would bring the same hash_keys
in the same partition and then they could be picked up by the same executor. This is the hypothesis, but still, I see one task running for a long time as it is doing an exorbitant amount of shuffle reads
and going through a lot more rows.
What could I be doing wrong? Is repartitioning
not a good approach here? I read somewhere that I could try ITERATIVE
broadcasting. That involved breaking the smaller table (which seems the lookup table here) into smaller chunks (I think less than 8 GB) and then broadcasting it multiple times to eventually merge all data later.
Any help would be appreciated as I am getting stuck in the same place with these strategies.
Thank you!
Doing a union on a few types to create the first dataframe. Then join it with the lookup.
allIncrementalEvents.as("e")
.filter(col("e.type") === "authentication")
.filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
.filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
.filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
.repartition(partitions)
UNION
allIncrementalEvents.as("e")
.filter(col("e.type") === "session")
.filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
.filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
.filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
.repartition(partitions)
UNION
allIncrementalEvents.as("e")
.filter(col("e.type") === "other")
.filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
.filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
.filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
.repartition(partitions)
Join
extractAuthEvents
.union(extractSubEvents)
.union(extractOpenEvents)
.union(extractSessionEvents)
.join(reverseLookupTableDf.as("adId"),
col("adId") === col("adId.hashed"),
"leftouter"
)
.join(reverseLookupTableDf.as("ip"),
col("ae.ip") === col("ip.hashed"),
"leftouter"
)
.join(reverseLookupTableDf.as("ua"),
col("ae.ua") === col("ua.hashed"),
"leftouter"
)
.join(reverseLookupTableDf.as("uid"),
col("ae.uuid") === col("uid.hashed"),
"leftouter"
)
Upvotes: 0
Views: 256