kongshem
kongshem

Reputation: 332

Hadoop MapReduce querying on large json data

Hadoop n00b here.

I have installed Hadoop 2.6.0 on a server where I have stored twelve json files I want to perform MapReduce operations on. These files are large, ranging from 2-5 gigabytes each.

The structure of the JSON files is an array of JSON objects. Snippet of two objects below:

[{"campus":"Gløshaugen","building":"Varmeteknisk og Kjelhuset","floor":"4. etasje","timestamp":1412121618,"dayOfWeek":3,"hourOfDay":2,"latitude":63.419161638078066,"salt_timestamp":1412121602,"longitude":10.404867443910122,"id":"961","accuracy":56.083199914753536},{"campus":"Gløshaugen","building":"IT-Vest","floor":"2. etasje","timestamp":1412121612,"dayOfWeek":3,"hourOfDay":2,"latitude":63.41709424828986,"salt_timestamp":1412121602,"longitude":10.402167488838765,"id":"982","accuracy":7.315199988880896}]

I want to perform MapReduce operations based on the fields building and timestamp. At least in the beginning until I get the hang of this. E.g. mapReduce the data where building equals a parameter and timestamp is greater than X and less than Y. The relevant fields I need after the reduce process is latitude and longitude.

I know there are different tools(Hive, HBase, PIG, Spark etc) you can use with Hadoop that might solve this easier, but my boss wants an evaluation of the MapReduce performance of standalone Hadoop.

So far I have created the main class triggering the map and reduce classes, implemented what I believe is a start in the map class, but I'm stuck on the reduce class. Below is what I have so far.

public class Hadoop {

    public static void main(String[] args) throws Exception {

        try {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "maze");
            job.setJarByClass(Hadoop.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path inPath = new Path("hdfs://xxx.xxx.106.23:50070/data.json");

            FileInputFormat.addInputPath(job, inPath);

            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

Mapper:

public class Map extends org.apache.hadoop.mapreduce.Mapper{
    private Text word = new Text();

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        try {
            JSONObject jo = new JSONObject(value.toString());
            String latitude = jo.getString("latitude");
            String longitude = jo.getString("longitude");
            long timestamp = jo.getLong("timestamp");
            String building = jo.getString("building");
            StringBuilder sb = new StringBuilder();

            sb.append(latitude);
            sb.append("/");
            sb.append(longitude);
            sb.append("/");
            sb.append(timestamp);
            sb.append("/");
            sb.append(building);
            sb.append("/");
            context.write(new Text(sb.toString()),value);

        }catch (JSONException e){
            e.printStackTrace();
        }
    }
}

Reducer:

public class Reducer extends org.apache.hadoop.mapreduce.Reducer{
    private Text result = new Text();

    protected void reduce(Text key, Iterable<Text> values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException {


    }
}

UPDATE

public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    private static String BUILDING;
    private static int tsFrom;
    private static int tsTo;
    try {
        JSONArray ja = new JSONArray(key.toString());
        StringBuilder sb;
        for(int n = 0; n < ja.length(); n++)
        {
            JSONObject jo = ja.getJSONObject(n);
            String latitude = jo.getString("latitude");
            String longitude = jo.getString("longitude");
            int timestamp = jo.getInt("timestamp");
            String building = jo.getString("building");



            if (BUILDING.equals(building) && timestamp < tsTo && timestamp > tsFrom) {
                sb = new StringBuilder();
                sb.append(latitude);
                sb.append("/");
                sb.append(longitude);
                context.write(new Text(sb.toString()), value);
            }
        }
    }catch (JSONException e){
        e.printStackTrace();
    }
}

@Override
public void configure(JobConf jobConf) {
    System.out.println("configure");
    BUILDING = jobConf.get("BUILDING");
    tsFrom = Integer.parseInt(jobConf.get("TSFROM"));
    tsTo = Integer.parseInt(jobConf.get("TSTO"));
}

This works for a small data set. Since I am working with LARGE json files, I get Java Heap Space exception. Since I am not familiar with Hadoop, I'm having trouble understanding how MapR can read the data without getting outOfMemoryError.

Upvotes: 1

Views: 1358

Answers (1)

Ran Locar
Ran Locar

Reputation: 561

If you simply want a list of LONG/LAT under the constraint of building=something and timestamp=somethingelse.

This is a simple filter operation; for this you do not need a reducer. In the mapper you should check if the current JSON satisfies the condition, and only then write it out to the context. If it fails to satisfy the condition you don't want it in the output.

The output should be LONG/LAT (no building/timestamp, unless you want them there as well)

If no reducer is present, the output of the mappers is the output of the job, which in your case is sufficient.

As for the code:

your driver should pass the building ID and the timestamp range to the mapper, using the job configuration. Anything you put there will be available to all your mappers.

Configuration conf = new Configuration();
conf.set("Building", "123");
conf.set("TSFROM", "12300000000");
conf.set("TSTO", "12400000000");
Job job = new Job(conf);

your mapper class needs to implement JobConfigurable.configure; in there you will read from the configuration object into local static variables

private static String BUILDING;
private static Long tsFrom;
private static Long tsTo;
public void configure(JobConf job) {
    BUILDING = job.get("Building");
    tsFrom = Long.parseLong(job.get("TSFROM"));
    tsTo = Long.parseLong(job.get("TSTO"));
}

Now, your map function needs to check:

if (BUILDING.equals(building) && timestamp < TSTO && timestamp > TSFROM) {
   sb = new StringBuilder();
   sb.append(latitude);
   sb.append("/");
   sb.append(longitude);
   context.write(new Text(sb.toString()),1);
}

this means any rows belonging to other buildings or outside the timestamp, would not appear in the result.

Upvotes: 2

Related Questions