Optimus Prime
Optimus Prime

Reputation: 73

Hive + Tez :: A join query stuck at last 2 mappers for a long time

I have a views table joining with a temp table with the below parameters intentionally enabled.

hive.auto.convert.join=true;    
hive.execution.engine=tez;

The Code Snippet is,

CREATE TABLE STG_CONVERSION AS    
SELECT CONV.CONVERSION_ID,
       CONV.USER_ID,
       TP.TIME,
       CONV.TIME AS ACTIVITY_TIME,
       TP.MULTI_DIM_ID,
       CONV.CONV_TYPE_ID,
       TP.SV1
FROM VIEWS TP
JOIN  SCU_TMP CONV ON TP.USER_ID = CONV.USER_ID
WHERE TP.TIME <= CONV.TIME;

In the normal scenario, both the tables can have any number of records.
However,in the SCU_TMP table, only 10-50 records are expected with the same User Id.

But in some cases, couple of User IDs come with around 10k-20k records in SCU Temp table, which creates a cross product effect.
In such cases, it'll run for ever with just 1 mapper to complete.

Is there any way to optimise this and run this gracefully?

Upvotes: 1

Views: 1697

Answers (1)

Optimus Prime
Optimus Prime

Reputation: 73

I was able to find a solution to it by the below query.

set hive.exec.reducers.bytes.per.reducer=10000
CREATE TABLE STG_CONVERSION AS    
SELECT CONV.CONVERSION_ID,    
       CONV.USER_ID,    
       TP.TIME,    
       CONV.TIME AS ACTIVITY_TIME,    
       TP.MULTI_DIM_ID,    
       CONV.CONV_TYPE_ID,    
       TP.SV1    
FROM (SELECT TIME,MULTI_DIM_ID,SV1 FROM VIEWS SORT BY TIME) TP    
JOIN  SCU_TMP CONV ON TP.USER_ID = CONV.USER_ID    
WHERE TP.TIME <= CONV.TIME;    

The problem arises due to the fact that when a single user id dominates the table, join of that user gets processed through a single mapper which gets stuck.

Two modifications to it,
1) Replaced Table name with a subquery - which added a sorting process before the join.
2)Reduced the hive.exec.reducers.bytes.per.reducer parameter to 10KB.

Sort by time in step (1) added a shuffle phase which evenly distributed the data which was earlier skewed by the User ID.
Reducing the bytes per reducer parameter resulted in distribution of data to all available reducers.

By these two enhancements, 10-12hrs run was reduced to 45 mins.

Upvotes: 3

Related Questions