Brett Bonner
Brett Bonner

Reputation: 425

Hive cross join fails on local map join

Is there a direct way to address the following error or overall a better way to use Hive to get the join that I need? Output to a stored table isn't a requirement as I can be content with an INSERT OVERWRITE LOCAL DIRECTORY to a csv.

I am trying to perform the following cross join. ipint is a 9GB table, and geoiplite is 270MB.

CREATE TABLE iplatlong_sample AS
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude
FROM geoiplite
CROSS JOIN ipintegers
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer;

I use CROSS JOIN on ipintegers instead of geoiplite because I have read that the rule is for the smaller table to be on the left, larger on the right.

Map and Reduce stages complete to 100% according to HIVE, but then

2015-08-01 04:45:36,947 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8767.09 sec

MapReduce Total cumulative CPU time: 0 days 2 hours 26 minutes 7 seconds 90 msec

Ended Job = job_201508010407_0001

Stage-8 is selected by condition resolver.

Execution log at: /tmp/myuser/.log

2015-08-01 04:45:38 Starting to launch local task to process map join; maximum memory = 12221153280

Execution failed with exit status: 3

Obtaining error information

Task failed!

Task ID: Stage-8

Logs:

/tmp/myuser/hive.log

FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

MapReduce Jobs Launched: Job 0: Map: 38 Reduce: 1 Cumulative CPU: 8767.09 sec
HDFS Read: 9438495086 HDFS Write: 8575548486 SUCCESS

My hive config:

SET hive.mapred.local.mem=40960;
SET hive.exec.parallel=true;
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate = true;
SET hive.optimize.skewjoin = true;
SET mapred.compress.map.output=true;
SET hive.stats.autogather=false;

I have varied SET hive.auto.convert.join between true and false but with the same result.

Here are the errors in the output log from /tmp/myuser/hive.log

$ tail -12 -f tmp/mysyer/hive.log

2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) -
Task failed!
Task ID:
  Stage-8

Logs:

2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

I am running the hive client on the Master, a Google Cloud Platform instance of type n1-highmem-8 type (8 CPU, 52GB) and workers are n1-highmem-4 (4CPU 26GB), but I suspect after MAP and REDUCE that a local join (as implied) takes place on the Master. Regardless, in bdutils I configured the JAVAOPTS for the worker nodes (n1-highmem-4) to: n1-highmem-4

SOLUTION EDIT: The solution is to organize the data the range data into a range tree.

Upvotes: 2

Views: 1366

Answers (1)

thule
thule

Reputation: 4222

I don't think it is possible to perform this kind of cross join brute force - just multiply the row numbers, it's a little out of hand. You need some optimizations, which I don't think hive is capable yet.

But is this problem can actually be solved in O(N1+N2) time providing you have your data sorted (which hive can do for you) - you just go through both lists simultaneously, on each step getting an ip integer, seeing if any intervals start on this integer, adding them, removing those that ended, emitting matching tuples, and so on. Pseudocode:

intervals=[]
ipintegers = iterator(ipintegers_sorted_file)
intervals = iterator(intervals_sorted_on_start_file)
for x in ipintegers:
    intervals = [i for i in intervals if i.end >= x]

    while(intervals.current.start<=x):
        intervals.append(intervals.current)
        intervals.next()
    for i in intervals:
        output_match(i, x)

Now, if you have an external script/UDF function that knows how to read the smaller table and gets ip integers as input and spits matching tuples as output, you can use hive and SELECT TRANSFORM to stream the inputs to it.

Or you can probably just run this algorithm on a local machine with two input files, because this is just O(N), and even 9 gb of data is very doable.

Upvotes: 1

Related Questions