FreezingGod
FreezingGod

Reputation: 31

Broken python pipeline in Hadoop streaming

I have a large scale log processing problem that I have to run on a hadoop cluster. The task is to feed each line of the log into a executable "cmd" and check the result to decide whether to keep this line of log or not.

Since the "cmd" program opens a very large dictionary, I cannot afford to call the program for everyline of the log. I want to keep it running and feed the required input to it. My current solution use subprocess module of python and here is the code:

import sys
from subprocess import Popen, PIPE

def main():
    pp = Popen('./bqc/bqc/bqc_tool ./bqc/bqc/bqc_dict/ ./bqc/bqc/word_dict/ flag', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)

    for line in sys.stdin:
        lstr = line.strip().split('\t')
        if len(lstr) != 7:
            continue
        pp.stdin.write('%s\n' % lstr[5])
        pp.stdin.flush()
        out = pp.stdout.readline()
        lout = out.strip().split('\t')
        if len(lout) == 3 and lout[1] == '401':
            print line.strip()

if __name__ == '__main__':
    main()

The above code works find when tested from my local machine. It is used as mapper when submitting the job to hadoop. I use no reducer and the following is the configuration.

hadoop streaming \
-input /path_to_input \
-output /path_to_output \
-mapper  "python/python2.7/bin/python27.sh ./mapper.py" \
-cacheArchive /path_to_python/python272.tar.gz#python \
-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \
-file ./mapper.py \
-jobconf mapred.job.name="JobName" \
-jobconf mapred.job.priority=HIGH

The files in bqc.tar.gz looks like this:

bqc/
bqc/bqc_tool
bqc/bqc_dict/
bqc/word_dict/

In my opinion, the line "-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \" should extract the tar file and extract them in a folder called bqc.

But it fails when submitted to a hadoop cluster with the following error message:

    Traceback (most recent call last):
      File "./mapper.py", line 19, in
        main()
      File "./mapper.py", line 11, in main
        pp.stdin.write('%s\n' % lstr[5])
    IOError: [Errno 32] Broken pipe
    java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590)
        at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:152)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388)
        at org.apache.hadoop.mapred.Child.main(Child.java:194)
    java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:163)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388)
        at org.apache.hadoop.mapred.Child.main(Child.java:194)

Anyone get an idea? Any help would be appreciated!

Thanks!

Zachary

Upvotes: 1

Views: 3616

Answers (1)

FreezingGod
FreezingGod

Reputation: 31

Mystery solved! It should be due to the memory limitation imposed by hadoop that caused the command cannot load successfully. The command needs about 2G memory and hadoop is configured to allow about 800MB per node.

Upvotes: 2

Related Questions