Reputation: 45
I am trying to run my first mapreduce job, which aggregates some data from xml files. My job is failing, and as I am a newbie at Hadoop, I would appreciate if someone could please take a look at what is going wrong.
I have:
posts_mapper.py:
#!/usr/bin/env python
import sys
import xml.etree.ElementTree as ET
input_string = sys.stdin.read()
class User(object):
def __init__(self, id):
self.id = id
self.post_type_1_count = 0
self.post_type_2_count = 0
self.aggregate_post_score = 0
self.aggregate_post_size = 0
self.tags_count = {}
users = {}
root = ET.fromstring(input_string)
for child in root.getchildren():
user_id = int(child.get("OwnerUserId"))
post_type = int(child.get("PostTypeId"))
score = int(child.get("Score"))
#view_count = int(child.get("ViewCount"))
post_size = len(child.get("Body"))
tags = child.get("Tags")
if user_id not in users:
users[user_id] = User(user_id)
user = users[user_id]
if post_type == 1:
user.post_type_1_count += 1
else:
user.post_type_2_count += 1
user.aggregate_post_score += score
user.aggregate_post_size += post_size
if tags != None:
tags = tags.replace("<", " ").replace(">", " ").split()
for tag in tags:
if tag not in user.tags_count:
user.tags_count[tag] = 0
user.tags_count[tag] += 1
for i in users:
user = users[i]
out = "%d %d %d %d %d " % (user.id, user.post_type_1_count, user.post_type_2_count, user.aggregate_post_score, user.aggregate_post_size)
for tag in user.tags_count:
out += "%s %d " % (tag, user.tags_count[tag])
print out
posts_reducer.py:
#!/usr/bin/env python
import sys
class User(object):
def __init__(self, id):
self.id = id
self.post_type_1_count = 0
self.post_type_2_count = 0
self.aggregate_post_score = 0
self.aggregate_post_size = 0
self.tags_count = {}
users = {}
for line in sys.stdin:
vals = line.split()
user_id = int(vals[0])
post_type_1 = int(vals[1])
post_type_2 = int(vals[2])
aggregate_post_score = int(vals[3])
aggregate_post_size = int(vals[4])
tags = {}
if len(vals) > 5:
#this means we got tags
for i in range (5, len(vals), 2):
tag = vals[i]
count = int((vals[i+1]))
tags[tag] = count
if user_id not in users:
users[user_id] = User(user_id)
user = users[user_id]
user.post_type_1_count += post_type_1
user.post_type_2_count += post_type_2
user.aggregate_post_score += aggregate_post_score
user.aggregate_post_size += aggregate_post_size
for tag in tags:
if tag not in user.tags_count:
user.tags_count[tag] = 0
user.tags_count[tag] += tags[tag]
for i in users:
user = users[i]
out = "%d %d %d %d %d " % (user.id, user.post_type_1_count, user.post_type_2_count, user.aggregate_post_score, user.aggregate_post_size)
for tag in user.tags_count:
out += "%s %d " % (tag, user.tags_count[tag])
print out
I run the command:
bin/hadoop jar hadoop-streaming-2.6.0.jar -input /stackexchange/beer/posts -output /stackexchange/beer/results -mapper posts_mapper.py -reducer posts_reducer.py -file ~/mapreduce/posts_mapper.py -file ~/mapreduce/posts_reducer.py
and get the output:
packageJobJar: [/home/hduser/mapreduce/posts_mapper.py, /home/hduser/mapreduce/posts_reducer.py, /tmp/hadoop-unjar6585010774815976682/] [] /tmp/streamjob8863638738687983603.jar tmpDir=null 15/03/20 10:18:55 INFO client.RMProxy: Connecting to ResourceManager at Master/10.1.1.22:8040 15/03/20 10:18:55 INFO client.RMProxy: Connecting to ResourceManager at Master/10.1.1.22:8040 15/03/20 10:18:57 INFO mapred.FileInputFormat: Total input paths to process : 10 15/03/20 10:18:57 INFO mapreduce.JobSubmitter: number of splits:10 15/03/20 10:18:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1426769192808_0004 15/03/20 10:18:58 INFO impl.YarnClientImpl: Submitted application application_1426769192808_0004 15/03/20 10:18:58 INFO mapreduce.Job: The url to track the job: http://i-644dd931:8088/proxy/application_1426769192808_0004/ 15/03/20 10:18:58 INFO mapreduce.Job: Running job: job_1426769192808_0004 15/03/20 10:19:11 INFO mapreduce.Job: Job job_1426769192808_0004 running in uber mode : false 15/03/20 10:19:11 INFO mapreduce.Job: map 0% reduce 0% 15/03/20 10:19:41 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000006_0, Status : FAILED 15/03/20 10:19:48 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000007_0, Status : FAILED 15/03/20 10:19:50 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000008_0, Status : FAILED 15/03/20 10:19:50 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000009_0, Status : FAILED 15/03/20 10:20:00 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000006_1, Status : FAILED 15/03/20 10:20:08 INFO mapreduce.Job: map 7% reduce 0% 15/03/20 10:20:10 INFO mapreduce.Job: map 20% reduce 0% 15/03/20 10:20:10 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000007_1, Status : FAILED 15/03/20 10:20:11 INFO mapreduce.Job: map 10% reduce 0% 15/03/20 10:20:17 INFO mapreduce.Job: map 20% reduce 0% 15/03/20 10:20:17 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000008_1, Status : FAILED 15/03/20 10:20:19 INFO mapreduce.Job: map 10% reduce 0% 15/03/20 10:20:19 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000009_1, Status : FAILED 15/03/20 10:20:22 INFO mapreduce.Job: map 20% reduce 0% 15/03/20 10:20:22 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000006_2, Status : FAILED 15/03/20 10:20:25 INFO mapreduce.Job: map 40% reduce 0% 15/03/20 10:20:25 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000002_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
15/03/20 10:20:28 INFO mapreduce.Job: map 50% reduce 0% 15/03/20 10:20:28 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000007_2, Status : FAILED 15/03/20 10:20:42 INFO mapreduce.Job: map 50% reduce 17% 15/03/20 10:20:52 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000008_2, Status : FAILED 15/03/20 10:20:54 INFO mapreduce.Job: Task Id : attempt_1426769192808_0004_m_000009_2, Status : FAILED 15/03/20 10:20:56 INFO mapreduce.Job: map 90% reduce 0% 15/03/20 10:20:57 INFO mapreduce.Job: map 100% reduce 100% 15/03/20 10:20:58 INFO mapreduce.Job: Job job_1426769192808_0004 failed with state FAILED due to: Task failed task_1426769192808_0004_m_000006 Job failed as tasks failed. failedMaps:1 failedReduces:0
Upvotes: 0
Views: 1032
Reputation: 146
Unfortunately, hadoop does not show stderr
for your python mapper/reducer so this output does not give any clue.
I would recommend you the following 2 throubleshooting steps:
cat {your_input_files} | ./posts_mapper.py | sort | ./posts_reducer.py
yarn logs -applicationId application_1426769192808_0004
or
hdfs dfs -cat /var/log/hadoop-yarn/apps/{user}/logs/
Upvotes: 0