Ankush Singh
Ankush Singh

Reputation: 560

Read from HDFS and write to HBASE

The Mapper is reading file from two places 1) Articles visited by user(sorting by country) 2) Statistics of country (country wise)

The output of both Mapper is Text, Text

I am running program of Amazon Cluster

My aim is read data from two different set and combine the result and store it in hbase.

HDFS to HDFS is working. The code is getting stuck at reducing 67% and gives error as

17/02/24 10:45:31 INFO mapreduce.Job:  map 0% reduce 0%
17/02/24 10:45:37 INFO mapreduce.Job:  map 100% reduce 0%
17/02/24 10:45:49 INFO mapreduce.Job:  map 100% reduce 67%
17/02/24 10:46:00 INFO mapreduce.Job: Task Id : attempt_1487926412544_0016_r_000000_0, Status : FAILED
Error: java.lang.IllegalArgumentException: Row length is 0
        at org.apache.hadoop.hbase.client.Mutation.checkRow(Mutation.java:565)
        at org.apache.hadoop.hbase.client.Put.<init>(Put.java:110)
        at org.apache.hadoop.hbase.client.Put.<init>(Put.java:68)
        at org.apache.hadoop.hbase.client.Put.<init>(Put.java:58)
        at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:45)
        at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

Driver class is

package com.happiestminds.hadoop;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class Main extends Configured implements Tool {

    /**
     * @param args
     * @throws Exception
     */
    public static String outputTable = "mapreduceoutput";

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Main(), args);
        System.exit(exitCode);
    }

    @Override
    public int run(String[] args) throws Exception {


        Configuration config = HBaseConfiguration.create();

        try{
            HBaseAdmin.checkHBaseAvailable(config);
        }
        catch(MasterNotRunningException e){
            System.out.println("Master not running");
            System.exit(1);
        }

        Job job = Job.getInstance(config, "Hbase Test");

        job.setJarByClass(Main.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);



        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ArticleMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, StatisticsMapper.class);

        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.initTableReducerJob(outputTable, CounterReducer.class, job);

        //job.setReducerClass(CounterReducer.class);

        job.setNumReduceTasks(1);


        return job.waitForCompletion(true) ? 0 : 1;
    }

}

Reducer class is

package com.happiestminds.hadoop;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class CounterReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {

    public static final byte[] CF = "counter".getBytes();
    public static final byte[] COUNT = "combined".getBytes();


    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
            throws IOException, InterruptedException {

        String vals = values.toString();
        int counter = 0;

        StringBuilder sbr = new StringBuilder();
        System.out.println(key.toString());
        for (Text val : values) {
            String stat = val.toString();
            if (stat.equals("***")) {
                counter++;
            } else {
                sbr.append(stat + ",");
            }

        }
        sbr.append("Article count : " + counter);


        Put put = new Put(Bytes.toBytes(key.toString()));
        put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString()));
        if (counter != 0) {
            context.write(null, put);
        }

    }



}

Dependencies

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>



        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.2.2</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.2</version>
        </dependency>



    </dependencies>

Upvotes: 0

Views: 1275

Answers (4)

Hari Singh
Hari Singh

Reputation: 165

According to the exception thrown by the program it is clear that key length is 0 so before putting into hbase you can check if key length is 0 or not then only you can put into the hbase.

More clarity why key length's 0 is not supported by hbase

Becuase HBase data model does not allow 0-length row key, it should be at least 1 byte. 0-byte row key is reserved for internal usage (to designate empty start key and end keys).

Upvotes: 1

Ashu Pachauri
Ashu Pachauri

Reputation: 1403

The error you get is quite self-explanatory. Row keys in HBase can't be empty (though values can be).

@Override
protected void reduce(Text key, Iterable<Text> values,
        Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
        throws IOException, InterruptedException {
    if (key == null || key.getLength() == 0) {
      // Log a warning about the empty key.
      return;
    }
    // Rest of your reducer follows.
}

Upvotes: 0

Alex
Alex

Reputation: 8937

A good practice is to validate your values before submitting them somewhere. In your particular case you can validate your key and sbr or wrap them into try-catch section with proper notification policy. You should output them into some log if they are not correct and update you unit tests with new test-cases:

 try
 {
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString()));
    if (counter != 0) {
        context.write(null, put);
    }
 }
 catch (IllegalArgumentException ex)
 {
      System.err.println("Error processing record - Key: "+ key.toString() +", values: " +sbr.ToString());
 }

Upvotes: 1

Deepan Ram
Deepan Ram

Reputation: 850

Can you try to check whether you are inserting any null values or not ?

HBase data model does not allow zero length row key, it should be at least 1 byte.

Please check in your reducer code before executing the put command , whether some of the values are populated to null or not.

Upvotes: 0

Related Questions