Reputation: 425
Is there a direct way to address the following error or overall a better way to use Hive to get the join that I need? Output to a stored table isn't a requirement as I can be content with an INSERT OVERWRITE LOCAL DIRECTORY
to a csv.
I am trying to perform the following cross join. ipint is a 9GB table, and geoiplite is 270MB.
CREATE TABLE iplatlong_sample AS
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude
FROM geoiplite
CROSS JOIN ipintegers
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer;
I use CROSS JOIN on ipintegers instead of geoiplite because I have read that the rule is for the smaller table to be on the left, larger on the right.
Map and Reduce stages complete to 100% according to HIVE, but then
2015-08-01 04:45:36,947 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8767.09 sec
MapReduce Total cumulative CPU time: 0 days 2 hours 26 minutes 7 seconds 90 msec
Ended Job = job_201508010407_0001
Stage-8 is selected by condition resolver.
Execution log at: /tmp/myuser/.log
2015-08-01 04:45:38 Starting to launch local task to process map join; maximum memory = 12221153280
Execution failed with exit status: 3
Obtaining error information
Task failed!
Task ID: Stage-8
Logs:
/tmp/myuser/hive.log
FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
MapReduce Jobs Launched: Job 0: Map: 38 Reduce: 1 Cumulative CPU: 8767.09 sec
HDFS Read: 9438495086 HDFS Write: 8575548486 SUCCESS
My hive config:
SET hive.mapred.local.mem=40960;
SET hive.exec.parallel=true;
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate = true;
SET hive.optimize.skewjoin = true;
SET mapred.compress.map.output=true;
SET hive.stats.autogather=false;
I have varied SET hive.auto.convert.join
between true and false but with the same result.
Here are the errors in the output log from /tmp/myuser/hive.log
$ tail -12 -f tmp/mysyer/hive.log
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) -
Task failed!
Task ID:
Stage-8
Logs:
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
I am running the hive client on the Master, a Google Cloud Platform instance of type n1-highmem-8 type (8 CPU, 52GB) and workers are n1-highmem-4 (4CPU 26GB), but I suspect after MAP and REDUCE that a local join (as implied) takes place on the Master. Regardless, in bdutils I configured the JAVAOPTS for the worker nodes (n1-highmem-4
) to: n1-highmem-4
SOLUTION EDIT: The solution is to organize the data the range data into a range tree.
Upvotes: 2
Views: 1366
Reputation: 4222
I don't think it is possible to perform this kind of cross join brute force - just multiply the row numbers, it's a little out of hand. You need some optimizations, which I don't think hive is capable yet.
But is this problem can actually be solved in O(N1+N2) time providing you have your data sorted (which hive can do for you) - you just go through both lists simultaneously, on each step getting an ip integer, seeing if any intervals start on this integer, adding them, removing those that ended, emitting matching tuples, and so on. Pseudocode:
intervals=[]
ipintegers = iterator(ipintegers_sorted_file)
intervals = iterator(intervals_sorted_on_start_file)
for x in ipintegers:
intervals = [i for i in intervals if i.end >= x]
while(intervals.current.start<=x):
intervals.append(intervals.current)
intervals.next()
for i in intervals:
output_match(i, x)
Now, if you have an external script/UDF function that knows how to read the smaller table and gets ip integers as input and spits matching tuples as output, you can use hive and SELECT TRANSFORM
to stream the inputs to it.
Or you can probably just run this algorithm on a local machine with two input files, because this is just O(N), and even 9 gb of data is very doable.
Upvotes: 1