Reputation: 16392
How do you use the MultipleOutputs class in a reducer to write multiple outputs, each of which can have its own unique configuration? There is some documentation in the MultipleOutputs javadoc, but it seems limited to Text outputs. It turns out that MultipleOutputs can handle the output path, key class and value class of each output, but attempts to use output formats that require the use of other configuration properties fail.
(This question has come up several times but my attempts to answer it have been thwarted because the asker actually had a different problem. Since this question has taken more than a few days of investigation for me to answer, I'm answering my own question here as suggested by this Meta Stack Overflow question.)
Upvotes: 2
Views: 3349
Reputation: 8985
I have implemented MultipleOutputs support for Cassandra (see this JIRA ticket, and it is currently scheduled for release in 1.2. If you need it now, you can apply the patch in the ticket. Also check out this presentation on the topic which gives examples on its usage.
Upvotes: 0
Reputation: 16392
I've crawled through the MultipleOutputs implementation and have found that it doesn't support any OutputFormatType that has properties other than outputDir, key class and value class. I tried to write my own MultipleOutputs class, but that failed because it needs to call a private method somewhere in the Hadoop classes.
I'm left with only one workaround that seems to work in all cases and all combinations of output formats and configurations: Write subclasses of the OutputFormat classes that I want to use (these turn out to be reusable). These classes understand that other OutputFormats are in use concurrently and know how to store away their properties. The design exploits the fact that an OutputFormat can be configured with the context just before being asked for its RecordWriter.
I've got this to work with Cassandra's ColumnFamilyOutputFormat:
package com.myorg.hadoop.platform;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public abstract class ConcurrentColumnFamilyOutputFormat
extends ColumnFamilyOutputFormat
implements Configurable {
private static String[] propertyName = {
"cassandra.output.keyspace" ,
"cassandra.output.keyspace.username" ,
"cassandra.output.keyspace.passwd" ,
"cassandra.output.columnfamily" ,
"cassandra.output.predicate",
"cassandra.output.thrift.port" ,
"cassandra.output.thrift.address" ,
"cassandra.output.partitioner.class"
};
private Configuration configuration;
public ConcurrentColumnFamilyOutputFormat() {
super();
}
public Configuration getConf() {
return configuration;
}
public void setConf(Configuration conf) {
configuration = conf;
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(property);
if (value != null) {
conf.set(propertyName[i], value);
}
}
}
public void configure(Configuration conf) {
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(propertyName[i]);
if (value != null) {
conf.set(property, value);
}
}
}
public abstract String getMultiOutputName();
}
For each Cassandra (in this case) output you want for your reducer, you'd have a class:
package com.myorg.multioutput.ReadCrawled;
import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;
public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {
public StrongOutputFormat() {
super();
}
@Override
public String getMultiOutputName() {
return "Strong";
}
}
and you'd configure it in your mapper/reducer configuration class:
// This is how you'd normally configure the ColumnFamilyOutputFormat
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
// This is how you tell the MultipleOutput-aware OutputFormat that
// it's time to save off the configuration so no other OutputFormat
// steps all over it.
new StrongOutputFormat().configure(job.getConfiguration());
// This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
// to out set of outputs
MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);
Just to give another example, the MultipleOutput subclass for FileOutputFormat uses these properties:
private static String[] propertyName = {
"mapred.output.compression.type" ,
"mapred.output.compression.codec" ,
"mapred.output.compress" ,
"mapred.output.dir"
};
and would be implement just like ConcurrentColumnFamilyOutputFormat
above except that it would use the above properties.
Upvotes: 2