Reputation: 2225
I haven't got much experience working with cassandra, so please excuse me if I have put in a wrong approach.
I am trying to do bulk load in cassandra with map reduce
Basically the word count example
Reference : http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/
I have put the simple Hadoop Wordcount Mapper Example and slightly modified the driver code and the reducer as per the above example.
I have successfully generated the output file as well. Now my doubt is how to perform the loading to cassandra part? Is there any difference in my approach ?
Please advice.
This is a part of the driver code
Job job = new Job();
job.setJobName(getClass().getName());
job.setJarByClass(CassaWordCountJob.class);
Configuration conf = job.getConfiguration();
conf.set("cassandra.output.keyspace", "test");
conf.set("cassandra.output.columnfamily", "words");
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160"); // default
conf.set("cassandra.output.thrift.address", "localhost");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");
job.setMapperClass(CassaWordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(CassaWordCountReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra"));
MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
return job.waitForCompletion(true) ? 0 : 1;
Mapper is the same as the normal wordcount mapper that just tokenizes and emits Word, 1
The reducer class is of the form
public class CassaWordCountReducer extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Mutation> columnsToAdd = new ArrayList<Mutation>();
Integer wordCount = 0;
for(IntWritable value : values) {
wordCount += value.get();
}
Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
countCol.setTimestamp(new Date().getTime());
ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
wordCosc.setColumn(countCol);
Mutation countMut = new Mutation();
countMut.column_or_supercolumn = wordCosc;
columnsToAdd.add(countMut);
context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
}
}
Upvotes: 3
Views: 4743
Reputation: 41458
To do bulk loads into Cassandra, I would advise looking at this article from DataStax. Basically you need to do 2 things for bulk loading:
In your case when using the BulkOutputFormat
, it should do all that as it's using the sstableloader
behind the scenes. I've never used it with MultipleOutputs
, but it should work fine.
I think the error in your case is that you're not using MultipleOutputs
correctly: you're still doing a context.write
, when you should really be writing to your MultipleOutputs
object. The way you're doing it right now, since you're writing to the regular Context
, it will get picked up by the default output format of TextOutputFormat
and not the one you defined in your MultipleOutputs
. More information on how to use the MultipleOutputs
in your reducer here.
Once you write to the correct output format of BulkOutputFormat
like you defined, your SSTables should get created and streamed to Cassandra from each node in your cluster - you shouldn't need any extra step, the output format will take care of it for you.
Also I would advise looking at this post, where they also explain how to use BulkOutputFormat
, but they're using a ConfigHelper
which you might want to take a look at to more easily configure your Cassandra endpoint.
Upvotes: 3