Reputation: 1668
I have a simple mapred job running on my Cassandra cluster, but when it tries to save the output to a table I get InvalidRequestException(why:Column timestamp required).
I've tried manually adding a 'timestamp' column to the CF but it doesnt make any difference.
Here's the description of my CF (as interpreted by cqlsh):
CREATE TABLE output_words (
key text PRIMARY KEY,
"count" int,
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
read_repair_chance=0.100000 AND
replicate_On_write='true' AND
populate_io_cache_on_flush='false' AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'SnappyCompressor'};
I'm using POM with hadoop-core v1.1.2 and cassandra-thrift v1.2.4 on top of Cassandra v1.2.4
Can anyone suggest how to get around this?
Additional info
Im configuring my job as follows (only showing config relevant to the output):
Job job = new Job(getConf(), "wordcount");
job.setJarByClass(TestJob.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(ReducerToCassandra.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), _keyspace, OUTPUT_COLUMN_FAMILY);
ConfigHelper.setOutputRpcPort(job.getConfiguration(), _port);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), _host);
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner");
And my reducer class:
public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(StringSerializer.get().toByteBuffer(word.toString()), Collections.singletonList(getMutation(word, sum)));
}
private static Mutation getMutation(Text word, int sum) {
Column c = new Column();
c.name = StringSerializer.get().toByteBuffer("count");
c.value = IntegerSerializer.get().toByteBuffer(sum);
c.timestamp = System.currentTimeMillis() * 1000;
Mutation m = new Mutation();
m.column_or_supercolumn = new ColumnOrSuperColumn();
m.column_or_supercolumn.column = c;
return m;
}
}
Upvotes: 0
Views: 301
Reputation: 4792
Instead of this
c.timestamp = System.currentTimeMillis() * 1000;
Can you try this
c.setTimestamp(System.currentTimeMillis() * 1000)
Upvotes: 1