crystyxn
crystyxn

Reputation: 1611

Hive: java.lang.OutOfMemoryError: Java heap space and Job running in-process (local Hadoop)

My setup: 4 node cluster in Google Cloud Platform (1 master, 3 workers) running NixOS Linux.

I have been using the TPC-DS toolkit to generate both data and queries are standard. On smaller dataset / more simpler queries they work just fine. The queries I take from here: https://github.com/hortonworks/hive-testbench/tree/hdp3/sample-queries-tpcds

This is the first one, query1.sql:

WITH customer_total_return AS 
( 
         SELECT   sr_customer_sk AS ctr_customer_sk , 
                  sr_store_sk    AS ctr_store_sk , 
                  Sum(sr_fee)    AS ctr_total_return 
         FROM     store_returns , 
                  date_dim 
         WHERE    sr_returned_date_sk = d_date_sk 
         AND      d_year =2000 
         GROUP BY sr_customer_sk , 
                  sr_store_sk) 
SELECT   c_customer_id 
FROM     customer_total_return ctr1 , 
         store , 
         customer 
WHERE    ctr1.ctr_total_return > 
         ( 
                SELECT Avg(ctr_total_return)*1.2 
                FROM   customer_total_return ctr2 
                WHERE  ctr1.ctr_store_sk = ctr2.ctr_store_sk) 
AND      s_store_sk = ctr1.ctr_store_sk 
AND      s_state = 'NM' 
AND      ctr1.ctr_customer_sk = c_customer_sk 
ORDER BY c_customer_id limit 100;

At first I had the problem of not being able to run this at all to success, running into java.lang.OutOfMemoryError: Java heap space.

What I did was:

  1. Increased GCP nodes power (up to 7.5 gb of RAM and dual core CPUs)
  2. Set these variables inside of the Hive CLI:
set mapreduce.map.memory.mb=2048;
set mapreduce.map.java.opts=-Xmx1024m;
set mapreduce.reduce.memory.mb=4096;
set mapreduce.reduce.java.opts=-Xmxe3072m;
set mapred.child.java.opts=-Xmx1024m;

  1. Restarted Hive

Then this query worked (along other similar ones) when it came to a 1 GB dataset. I've monitored the situation with htop and the memory usage does not exceed 2gb while both CPU cores are used to 100% almost constantly.

Now the problem is, when it comes to more complex queries with larger dataset, the error starts again:

The query runs just fine for an entire minute or so, but ends in a FAIL. Full stacktrace:

hive> with customer_total_return as
    > (select sr_customer_sk as ctr_customer_sk
    > ,sr_store_sk as ctr_store_sk
    > ,sum(SR_FEE) as ctr_total_return
    > from store_returns
    > ,date_dim
    > where sr_returned_date_sk = d_date_sk
    > and d_year =2000
    > group by sr_customer_sk
    > ,sr_store_sk)
    >  select c_customer_id
    > from customer_total_return ctr1
    > ,store
    > ,customer
    > where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2
    > from customer_total_return ctr2
    > where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
    > and s_store_sk = ctr1.ctr_store_sk
    > and s_state = 'TN'
    > and ctr1.ctr_customer_sk = c_customer_sk
    > order by c_customer_id
    > limit 100;
No Stats for default@store_returns, Columns: sr_returned_date_sk, sr_fee, sr_store_sk, sr_customer_sk
No Stats for default@date_dim, Columns: d_date_sk, d_year
No Stats for default@store, Columns: s_state, s_store_sk
No Stats for default@customer, Columns: c_customer_sk, c_customer_id
Query ID = root_20190811164854_c253c67c-ef94-4351-b4d3-74ede4c5d990
Total jobs = 14
Stage-29 is selected by condition resolver.
Stage-1 is filtered out by condition resolver.
Stage-30 is selected by condition resolver.
Stage-10 is filtered out by condition resolver.
SLF4J: Found binding in [jar:file:/nix/store/jjm6636r99r0irqa03dc1za9gs2b4fx6-source/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nix/store/q9jpwzbqbg8k8322q785xfavg0p0v18i-hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
Execution completed successfully
MapredLocal task succeeded
SLF4J: Found binding in [jar:file:/nix/store/jjm6636r99r0irqa03dc1za9gs2b4fx6-source/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nix/store/q9jpwzbqbg8k8322q785xfavg0p0v18i-hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Execution completed successfully
MapredLocal task succeeded
Launching Job 3 out of 14
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2019-08-11 16:49:19,415 Stage-20 map = 0%,  reduce = 0%
2019-08-11 16:49:22,418 Stage-20 map = 100%,  reduce = 0%
Ended Job = job_local404291246_0005
Launching Job 4 out of 14
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2019-08-11 16:49:24,718 Stage-22 map = 0%,  reduce = 0%
2019-08-11 16:49:27,721 Stage-22 map = 100%,  reduce = 0%
Ended Job = job_local566999875_0006
Launching Job 5 out of 14
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-11 16:49:29,958 Stage-2 map = 0%,  reduce = 0%
2019-08-11 16:49:33,970 Stage-2 map = 100%,  reduce = 0%
2019-08-11 16:49:35,974 Stage-2 map = 100%,  reduce = 100%
Ended Job = job_local1440279093_0007
Launching Job 6 out of 14
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-11 16:49:37,235 Stage-11 map = 0%,  reduce = 0%
2019-08-11 16:49:40,421 Stage-11 map = 100%,  reduce = 0%
2019-08-11 16:49:42,424 Stage-11 map = 100%,  reduce = 100%
Ended Job = job_local1508103541_0008
SLF4J: Found binding in [jar:file:/nix/store/jjm6636r99r0irqa03dc1za9gs2b4fx6-source/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nix/store/q9jpwzbqbg8k8322q785xfavg0p0v18i-hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

2019-08-11 16:49:51 Dump the side-table for tag: 1 with group count: 21 into file: file:/tmp/root/3ab30b3b-380d-40f5-9f72-68788d998013/hive_2019-08-11_16-48-54_393_105456265244058313-1/-local-10019/HashTable-Stage-19/MapJoin-mapfile71--.hashtable
Execution completed successfully
MapredLocal task succeeded
Launching Job 7 out of 14
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2019-08-11 16:49:53,956 Stage-19 map = 100%,  reduce = 0%
Ended Job = job_local2121921517_0009
Stage-26 is filtered out by condition resolver.
Stage-27 is selected by condition resolver.
Stage-4 is filtered out by condition resolver.

2019-08-11 16:50:01 Dump the side-table for tag: 0 with group count: 99162 into file: file:/tmp/root/3ab30b3b-380d-40f5-9f72-68788d998013/hive_2019-08-11_16-48-54_393_105456265244058313-1/-local-10017/HashTable-Stage-17/MapJoin-mapfile60--.hashtable
2019-08-11 16:50:02 Uploaded 1 File to: file:/tmp/root/3ab30b3b-380d-40f5-9f72-68788d998013/hive_2019-08-11_16-48-54_393_105456265244058313-1/-local-10017/HashTable-Stage-17/MapJoin-mapfile60--.hashtable (2832042 bytes)
Execution completed successfully
MapredLocal task succeeded
Launching Job 9 out of 14
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2019-08-11 16:50:04,004 Stage-17 map = 0%,  reduce = 0%
2019-08-11 16:50:05,005 Stage-17 map = 100%,  reduce = 0%
Ended Job = job_local694362009_0010
Stage-24 is selected by condition resolver.
Stage-25 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.

SLF4J: Found binding in [jar:file:/nix/store/q9jpwzbqbg8k8322q785xfavg0p0v18i-hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
2019-08-11 16:50:12 Starting to launch local task to process map join;  maximum memory = 239075328
Execution completed successfully
MapredLocal task succeeded
Launching Job 11 out of 14
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2019-08-11 16:50:14,254 Stage-13 map = 100%,  reduce = 0%
Ended Job = job_local1812693452_0011
Launching Job 12 out of 14
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-11 16:50:15,481 Stage-6 map = 0%,  reduce = 0%
Ended Job = job_local920309638_0012 with errors
Error during job, obtaining debugging information...
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched: 
Stage-Stage-20:  HDFS Read: 8662606197 HDFS Write: 0 SUCCESS
Stage-Stage-22:  HDFS Read: 9339349675 HDFS Write: 0 SUCCESS
Stage-Stage-2:  HDFS Read: 9409277766 HDFS Write: 0 SUCCESS
Stage-Stage-11:  HDFS Read: 9409277766 HDFS Write: 0 SUCCESS
Stage-Stage-19:  HDFS Read: 4704638883 HDFS Write: 0 SUCCESS
Stage-Stage-17:  HDFS Read: 4771516428 HDFS Write: 0 SUCCESS
Stage-Stage-13:  HDFS Read: 4771516428 HDFS Write: 0 SUCCESS
Stage-Stage-6:  HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec

The problem in the hive.log file is still the same:

java.lang.Exception: java.lang.OutOfMemoryError: Java heap space

And I realized my worker nodes don't actually do anything (htop showed that they were idle while only the master node was working) Even in the stack trace:

Job running in-process (local Hadoop)

How can I make Hive use HDFS not just Local Hadoop? Running hdfs dfs -df -h hdfs:<redacted>:9000/ returns

Filesystem                   Size    Used  Available  Use%
hdfs://<redacted>:9000  88.5 G  34.3 G     35.2 G   39%

Which is correct, I have 3 worker nodes, each with 30 GB disks.

Upvotes: 3

Views: 4462

Answers (3)

sumitya
sumitya

Reputation: 2691

OOM issues are related to query performance most of the time.

There are two queries here:

Part 1:

WITH customer_total_return AS 

( 
         SELECT   sr_customer_sk AS ctr_customer_sk , 
                  sr_store_sk    AS ctr_store_sk , 
                  Sum(sr_fee)    AS ctr_total_return 
         FROM     store_returns , 
                  date_dim 
         WHERE    sr_returned_date_sk = d_date_sk 
         AND      d_year =2000 
         GROUP BY sr_customer_sk , 
                  sr_store_sk)

Part 2:

SELECT   c_customer_id 
FROM     customer_total_return ctr1 , 
         store , 
         customer 
WHERE    ctr1.ctr_total_return > 
         ( 
                SELECT Avg(ctr_total_return)*1.2 
                FROM   customer_total_return ctr2 
                WHERE  ctr1.ctr_store_sk = ctr2.ctr_store_sk) 

AND      s_store_sk = ctr1.ctr_store_sk 
AND      s_state = 'NM' 
AND      ctr1.ctr_customer_sk = c_customer_sk 
ORDER BY c_customer_id limit 100;

Try enabling JMX for the hive cluster link

And see the memory usage of both the parts of query. And the part2 inner query also.

Few hive optimizations for above queries can be tried out:

  1. Use SORT BY instead of ORDER BY Clause -> SORT BY clause, that orders the data only within each reducer.

  2. Partition the tables on the join keys to read only specific data instead of whole table scan.

  3. cache the small hive table in distributed cache and use map side join to reduce the shuffling For example:

select /*+MAPJOIN(b)*/ col1,col2,col3,col4 from table_A a join table_B b on a.account_number=b.account_number

  1. If there is a possibility of skew data in any of the tables then use following parameters:

set hive.optimize.skewjoin=true; set hive.skewjoin.key=100000; (i.e. the threshold of the data should go to one node)

Upvotes: 0

Ambrish
Ambrish

Reputation: 3677

java.lang.OutOfMemoryError: Java heap space It will happen if you are trying to push too much data on the single machine.

Based on the query provided, there are few things that you can try:

  1. Change your join conditions to explicit (remove WHERE CLAUSE and use INNER/LEFT JOIN). e.g.
FROM     customer_total_return ctr1 
         INNER JOIN store s
             ON ctr1.ctr_store_sk = s.s_store_sk
                AND s_state = 'NM'
         INNER JOIN customer c
             ON ctr1.ctr_customer_sk = c.c_customer_sk
  1. Check if you have skewed data for one of the following fields:
    1. store_returns -> sr_returned_date_sk
    2. store_returns -> sr_store_sk
    3. store_returns -> sr_customer_sk
    4. customer -> c_customer_sk
    5. store -> s_store_sk

It might be possible the one of the KEY has high percent of values and that might cause 1 of the node to be overloaded (when data size is huge).

Basically you are trying eliminate possible reasons of node overloading.

Let me know if it helps.

Upvotes: 1

sprabhagaran
sprabhagaran

Reputation: 1

It could be resource issue. Hive queries are internally executed as Map-Reduce jobs. You could check the Job History logs for the Hive Map-Reduce jobs failed. Sometimes executing queries from shell are faster compared to the Hive-Query editor.

Upvotes: 0

Related Questions