Reputation: 91959
so when we use Java
for writing map/reduce
program, the map collects the data and reducer receives the list of values per key, like
Map(k, v) -> k1, v1
then shuffle and sort happens
then reducer gets it
reduce(k1, List<values>)
to work on. but is it possible to do the same with python
using streaming
? I used this as reference and seems like reducer gets data per line as supplied on command-line
Upvotes: 2
Views: 7495
Reputation: 31
As per Hadoop's streaming references here:
when the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value.
However, you can customize this default. You can specify a field separator other than the tab character (the default), and you can specify the nth (n >= 1) character rather than the first character in a line (the default) as the separator between the key and value. For example:
Sample code:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer
Upvotes: 0
Reputation: 51
May be this can help you. I found this from apache... org
Customizing the Way to Split Lines into Key/Value Pairs As noted earlier, when the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value.
However, you can customize this default. You can specify a field separator other than the tab character (the default), and you can specify the nth (n >= 1) character rather than the first character in a line (the default) as the separator between the key and value. For example:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4
In the above example, -D stream.map.output.field.separator=.
specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).
Similarly, you can use -D stream.reduce.output.field.separator=SEP
and -D stream.num.reduce.output.fields=NUM
to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.
Similarly, you can specify stream.map.input.field.separator
and stream.reduce.input.field.separator
as the input separator for map/reduce inputs. By default the separator is the tab character.
Upvotes: 5
Reputation: 33495
PipeReducer is the reducer implementation for Hadoop streaming. The reducer gets key/values, iterates it and sends to the STDIN as key/value and not key/values. This is the default behavior of Hadoop streaming. I don't see any option to change this, unless the Hadoop code has been modified.
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
.....
while (values.hasNext()) {
.....
inWriter_.writeKey(key);
inWriter_.writeValue(val);
.....
}
}
Upvotes: 1
Reputation: 3107
In Hadoop Streaming, the mapper writes key-value pairs to sys.stdout
. Hadoop does the shuffle and sort and directs the results to the mapper in sys.stdin
. How you actually handle the map and the reduce is entirely up to you, so long as you follow that model (map to stdout, reduce from stdin). This is why it can be tested independently of Hadoop via cat data | map | sort | reduce
on the command line.
The input to the reducer is the same key-value pairs that were mapped, but comes in sorted. You can iterate through the results and accumulate totals as the example demonstrates, or you can take it further and pass the input to itertools.groupby()
and that will give you the equivalent to the k1, List<values>
input that you are used to, and which work well the the reduce()
builtin.
The point being that it's up to you to implement the reduce.
Upvotes: 1