lunes, febrero 10, 2014

Hadoop: extending IntWritable (or primitives) and the setOutputKeyClass / setMapOutputKeyClass

Say you want to subclass an IntWritable, for the sake of this example, to implement a compareTo method that will allow it to be sorted in descending order instead of the typical ascending order.

 You may do it with some code like this:

import org.apache.hadoop.io.IntWritable;

public class IntWritableOrderable extends IntWritable {

 @Override
 public int compareTo(IntWritable o) {
  int thisValue = this.get();
     int thatValue = o.get();
     return (thisValue>thatValue ? -1 :    // changed < for > 
      (thisValue==thatValue ? 0 : 1));
 }

 public IntWritableOrderable() {
  super();
 }

 public IntWritableOrderable(int value) {
  super(value);
 }

 
}

Right. But now you want your program to accept an ASC or DESC flag to make it available to the client. Then, you will get this flag in your Job:

    Job job = new Job();
    // ...
    String order = args[2];
    if("DESC".equals(order)) {
     job.getConfiguration().set("SORT.ORDER", "DESC");
    }
    job.setMapOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

And, given that it subclasses IntWritable, in your Mapper you will emit types of one or the other type depending on that flag, in code such as:

 @Override
 public void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException 
{
    String order = context.getConfiguration().get("SORT.ORDER", "ASC");
    // int k = value to emit as key
    // ... 
    context.write(descending ? 
        new IntWritableOrderable(k) : new IntWritable(k), 
        value );
    // ... 
}

And you're done, right?

Wrong.

The Shuffle and Sort phase will take the values and recreate them after being serialized. This is why we set in our job the apparently redundant lines:

    job.setMapOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

We have to tell the application to understand these classes as one or the other - since this will be the constructor to use when re-serializing. You can fix this issue by also making it dependable on the flag:

    String order = args[2]; // say
    // Assuming ASC default:
    if("DESC".equals(order)) {
     job.getConfiguration().set("SORT.ORDER", "DESC");
     System.out.println("Colocando el DESC");
     
     job.setMapOutputKeyClass(IntWritableOrderable.class);
     job.setOutputValueClass(Text.class);
     
     job.setOutputKeyClass(IntWritableOrderable.class);
     job.setOutputValueClass(Text.class);
    }
    else
    {
     job.setMapOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(Text.class);
     
     job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(Text.class);
    }

And you're done: these are the classes that will be used in the shuffle and sort phase. Great! PS. Obviously, you can enhance the IntWritableOrderable to receive a flag in its constructor and compare ASC or DESC according to that flag. Although this would be more efficient. But in any case, it was a stupid example to illustrate this.

No hay comentarios: