Reputation: 3267
I giving a tar.bz2 file ,.gz and tar.gz files as input after changing the properties in mapred-site.xml. None of the above seem to have worked. What I assumed to happen here is the records read as input by hadoop go out of sequence ie. one column of input is string and the other is an integer but while reading it from the compressed file because of some out of sequence data, at some point hadoop reads the string part as an integer and generates an illegal format exception. I'm just a noob. I want to know whether there is a problem in the configuration or my code.
The properties in core-site.xml are
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apac\
he.hadoop.io.compress.SnappyCodec</value>
<description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>
properties in mapred-site.xml are
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.map.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>
This is my Code
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;
public class MySort{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable Marks = new IntWritable();
private Text name = new Text();
String one,two;
int num;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
one=tokenizer.nextToken();
name.set(one);
if(tokenizer.hasMoreTokens())
two=tokenizer.nextToken();
num=Integer.parseInt(two);
Marks.set(num);
context.write(name, Marks);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");
// conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
// conf.setBoolean("mapreduce.map.output.compress",true);
conf.setBoolean("mapred.output.compress",true);
//conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
//conf.setBoolean("mapreduce.map.output.compress",true);
conf.set("mapred.output.compression.type", "BLOCK");
//conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.setClass("mapred.map.output.compression.codec", BZip2Codec.class, CompressionCodec.class);
Job job = new Job(conf, "mysort");
job.setJarByClass(org.myorg.MySort.class);
job.setJobName("mysort");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setCompressInput(job,true);
FileOutputFormat.setCompressOutput(job, true);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString());
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
These are all commandsput together in a makefile
run: all
-sudo ./a.out
sudo chmod 777 -R Data
-sudo rm data.tar.bz2
sudo tar -cvjf data.tar.bz2 Data/data.txt
sudo javac -classpath /home/hduser/12115_Select_Query/hadoop-core-1.1.2.jar -d mysort MySort.java
sudo jar -cvf mysort.jar -C mysort/ .
-hadoop fs -rmr MySort/output
-hadoop fs -rmr MySort/input
hadoop fs -mkdir MySort/input
hadoop fs -put data.tar.bz2 MySort/input
hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output
-sudo rm /home/hduser/Out/sort.txt
hadoop fs -copyToLocal MySort/output/part-r-00000 /home/hduser/Out/sort.txt
sudo gedit /home/hduser/Out/sort.txt
all: rdata.c
-sudo rm a.out
-gcc rdata.c -o a.out
exec: run
.PHONY: exec run
Command:
hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output
Here is the output:
Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
14/06/25 11:20:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/25 11:20:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/06/25 11:20:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/25 11:20:29 INFO input.FileInputFormat: Total input paths to process : 1
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: number of splits:1
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1403675322820_0001
14/06/25 11:20:30 INFO impl.YarnClientImpl: Submitted application application_1403675322820_0001
14/06/25 11:20:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1403675322820_0001/
14/06/25 11:20:30 INFO mapreduce.Job: Running job: job_1403675322820_0001
14/06/25 11:20:52 INFO mapreduce.Job: Job job_1403675322820_0001 running in uber mode : false
14/06/25 11:20:52 INFO mapreduce.Job: map 0% reduce 0%
14/06/25 11:21:10 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_0, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.myorg.MySort$Map.map(MySort.java:36)
at org.myorg.MySort$Map.map(MySort.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
14/06/25 11:21:29 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_1, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.myorg.MySort$Map.map(MySort.java:36)
at org.myorg.MySort$Map.map(MySort.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
14/06/25 11:21:49 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_2, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.myorg.MySort$Map.map(MySort.java:36)
at org.myorg.MySort$Map.map(MySort.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
14/06/25 11:22:10 INFO mapreduce.Job: map 100% reduce 100%
14/06/25 11:22:10 INFO mapreduce.Job: Job job_1403675322820_0001 failed with state FAILED due to: Task failed task_1403675322820_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0
14/06/25 11:22:10 INFO mapreduce.Job: Counters: 9
Job Counters
Failed map tasks=4
Launched map tasks=4
Other local map tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=69797
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=69797
Total vcore-seconds taken by all map tasks=69797
Total megabyte-seconds taken by all map tasks=71472128
I have also tried using this:
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -Dmapred.output.compress=true -Dmapred.compress.map.output=true -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec -Dmapred.reduce.tasks=0 -input MySort/input/data.txt -output MySort/zip1
It is successfull in creating compressed files
hadoop fs -ls MySort/zip1
Found 3 items
-rw-r--r-- 1 hduser supergroup 0 2014-06-25 10:43 MySort/zip1/_SUCCESS
-rw-r--r-- 1 hduser supergroup 42488018 2014-06-25 10:43 MySort/zip1/part-00000.bz2
-rw-r--r-- 1 hduser supergroup 42504084 2014-06-25 10:43 MySort/zip1/part-00001.bz2
and then running this:
hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/zip1
It still doesn't work . Is there something that I am missing here.
It works fine when I run it without using compressed file bz2 and directly passing it the text file Data/data.txt i.e uploading it to MySort/input in hdfs (hadoop fs -put Data/data.txt MySort/input).
Any help is Appreciated
Upvotes: 3
Views: 6516
Reputation: 3267
I did a work around for this. I used a Tool Runner.
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ToolMapReduce extends Configured implements Tool
{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable Marks = new IntWritable();
private Text name = new Text();
String one,two;
int num;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
one=tokenizer.nextToken();
name.set(one);
if(tokenizer.hasMoreTokens())
two=tokenizer.nextToken();
num=Integer.parseInt(two);
Marks.set(num);
context.write(name, Marks);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
System.exit(res);
}
public int run(String[] args) throws Exception
{
Configuration conf = this.getConf();
//Configuration conf = new Configuration();
//conf.setOutputFormat(SequenceFileOutputFormat.class);
//SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
//SequenceFileOutputFormat.setCompressOutput(conf, true);
//conf.set("mapred.output.compress","true");
// conf.set("mapred.output.compression","org.apache.hadoop.io.compress.SnappyCodec");
//conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
// conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");
// conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
// conf.setBoolean("mapreduce.map.output.compress",true);
conf.setBoolean("mapred.output.compress",true);
//conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
//conf.setBoolean("mapreduce.map.output.compress",true);
conf.set("mapred.output.compression.type", "BLOCK");
//conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
Job job = new Job(conf, "mysort");
job.setJarByClass(org.myorg.ToolMapReduce.class);
//job.setJarByClass(org.myorg.MySort.class);
job.setJobName("mysort");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setCompressInput(job,true);
FileOutputFormat.setCompressOutput(job, true);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString());
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
//job.waitForCompletion(true);
}
}
Upvotes: 2