Using Secondary Sort to Enhance Adobe Data Feed Processing in Hadoop

In my last post, I described the basics for processing Adobe Analytics Click Stream Data Feeds using Hadoop. While the solutions outlined there will scale remarkably well, there is a more memory efficient way to do it. Having this flexibility is nice if you have lots of CPU cores available but not as much ram.

Once again, this full project, (including the code from the last project) are available in GitHub.

Secondary Sorting in Hadoop

By default, Hadoop will sort the data by the key identified in the mapper. This is done to simply group common data together so that it can be processed in the same reducer.

When processing Adobe Analytics Click Stream Data Feeds, you’ll typically give Hadoop the visitor ID as the key so that all of the hits for a visitor are available to a reducer. In a lot of cases though, it may be useful to also sort the hit data for the visitor in chronological order. For example, you may want to calculate the following types of metrics:

  • First touch/last touch campaign attribution
  • Time on site, time between first hit and cart add, time in site section, etc
  • Most recently viewed page or site section

In my last example, I showed how you can provide a Comparator class to the sort() method of an ArrayList to do this sort in the reducer. You can actually write some custom code and have Hadoop perform the sorting for you. This is nice because it distributes the load across the cluster and doesn’t require you to load a visitors entire history into memory.

Custom Classes to Build

There are a few classes that we’ll have to build to pull this off:

  • Writable Key – Used to communicate the visitor id, visit_num, and visit_page_num information
  • Partitioner – Used to identify which Hadoop node processes which visitors – a good partitioner will make your cluster more efficient
  • Grouper – Compares visitor ids in order to group visitor data together
  • Comparator – Compares individual hits for sorting purposes

Writable Key

The writable key class is a special class that we’ll build that will enable Hadoop to efficiently move the visitor id, visit_num, and visit_page_num information from the mapper, to the shuffle sort code, to the reducers. Writable classes in Hadoop allow serialization/deserialization of this information in a way that Hadoop can understand.

You might be wondering, “Why not just create a custom key using a Text object that contains something like ‘visitor_id:visit_num:visit_page_num’, and then parse out the parts that we need?”

Well, it turns out, the comparators (Grouper and Comparator) get run a lot. You really want these to be as efficient as possible. There is a lot of extra work that a comparator has to do if it has to parse a string, extract the parts it needs, and then calculate the comparison. Let’s just do this once in the mapper when we read in the data originally.

Do build a custom writable class, you’ll need to implement either Writable, or in our case, WritableComparable. We’re using WritableComparable because we’re building our own custom comparators.

I’m going to call our writable class CompositeDataFeedKey. It will have four properties:

  • Text visId – stores the combined post_visid_high and post_visid_low values
  • IntWritable visitNum – stores the visit_num value for the hit
  • IntWritable visitPageNum – stores the visit_page_num value for the hit
  • DoubleWritable hitOrder – combines visit_num and visit_page num to make comparisons easier

To implement this class, you have to implement the following methods:

  • public void write(DataOutput o) throws IOException
  • public void readFields(DataInput i) throws IOException
  • public int compareTo()

Let’s first go through the code for the write() method:

@Override
public void write(DataOutput dataOutput) throws IOException {
 this.visId.write(dataOutput);
 this.visitNum.write(dataOutput);
 this.visitPageNum.write(dataOutput);
 this.hitOrder.write(dataOutput);
}

This is fairly basic. We get a dataOutput object that we can use to write the data from each field to. Notice how I’m using classes that are built into Hadoop (Text, IntWritable, DoubleWritable) to make this easier. I can use each class’ write() method.

Now let’s look at the code for the readFields() method:

@Override
public void readFields(DataInput dataInput) throws IOException {
 this.visId.readFields(dataInput);
 this.visitNum.readFields(dataInput);
 this.visitPageNum.readFields(dataInput);
 this.hitOrder.readFields(dataInput);
}

Also, fairly basic. We get a dataInput object that we can use to read the data into the object. Again, I can use the readFields() methods from the native Hadoop types to make this a bit easier.

Finally, let’s look at compareTo():

@Override
public int compareTo(CompositeDataFeedKey o) {
 final int result = this.getVisId().compareTo(o.getVisId());
 if(result == 0) {
  if(this.getHitOrder().get() < o.getHitOrder().get()) return -1;
  else if(this.getHitOrder().get() > o.getHitOrder().get()) return 1;
  else return 0;
 } else return result;
}

Hopefully, you can now see why hitOrder is important. In my constructors and set() methods, I calculate hitOrder so that given a visit_num of 2 and a visit_page_num of 16, hitOrder will be 2.16. This is a number that I can easily compare using the standard comparison constructs in Java.

Partitioner

The partitioner is very simple to implement. For that reason, I’ll just include the entire class here:

public class DataFeedPartitioner extends Partitioner<CompositeDataFeedKey, Text> {
 private static final Logger LOGGER = LoggerFactory.getLogger(DataFeedPartitioner.class);
 @Override
 public int getPartition(CompositeDataFeedKey key, Text value, int partitions) {
  return (key.getVisId().hashCode() & Integer.MAX_VALUE) % partitions;
 }
}

Fairly straight forward, right? Well, there are three things I want to call out:

  1. This method basically returns an integer between 0 and whatever value comes into the ‘partitions’ parameter. (This is usually the number of reducers).
  2. It does this by calculating a hash code for the visitor ID (built in Java method) and using a modulus operation to calculate the partition.
  3. We are using a bitwise AND operator to add Integer.MAX_VALUE to the hash code. This is because the hash code can return negative values. This boolean arithmetic ensures that the value is always positive, and still evenly distributed.

The built in hashCode() method isn’t the absolute best way to perfectly distribute visitor data across the cluster, but its fast and good enough. If you find that one of your nodes is getting a lot more visitors than the other nodes, this is where you’ll need to make an improvement.

Grouper

This is the code that groups hits together that have a common visitor ID. Its very short and sweet and just uses the built in compareTo() method to do the comparison. Note that we are casting WritableComparable objects to CompositeDataFeedKey. If this casting doesn’t happen, we can’t call the getVisId() method.

@Override
public int compare(WritableComparable a, WritableComparable b) {
 CompositeDataFeedKey key1 = (CompositeDataFeedKey)a;
 CompositeDataFeedKey key2 = (CompositeDataFeedKey)b;
 return key1.getVisId().compareTo(key2.getVisId());
}

Comparator

Finally, we have our comparator. This compares not only compares the visitor IDs in the hits, but also the visit_num and visit_page num (via the hitOrder variable we created).

@Override
public int compare(WritableComparable a, WritableComparable b) {
 CompositeDataFeedKey key1 = (CompositeDataFeedKey)a;
 CompositeDataFeedKey key2 = (CompositeDataFeedKey)b;

 final int result = key1.getVisId().compareTo(key2.getVisId());
 if(result == 0) {
  if(key1.getHitOrder().get() < key2.getHitOrder().get()) return -1;
  else if(key1.getHitOrder().get() > key2.getHitOrder().get()) return 1;
  else return 0;
 } else return result;
}

Text

Bringing it All Together

In our Job class, we need to let Hadoop know that these new classes exist. For that reason, we need to add the following to the run() method:

job.setPartitionerClass(DataFeedPartitioner.class);
job.setGroupingComparatorClass(DataFeedGrouper.class);
job.setSortComparatorClass(DataFeedComparator.class);

We also need to correctly update the data type for the key that the mapper will return.

job.setMapOutputKeyClass(CompositeDataFeedKey.class);
job.setMapOutputValueClass(Text.class);

Don’t forget to update the mapper itself:

public class ScalableMapper extends Mapper<Object, Text, CompositeDataFeedKey, Text> {

Now, in our reducer, instead of reading all of a visitor’s data into memory, sorting it, and then operating on it, we can just iterate over the Iterable<Text> values object provided and expect the data to already be in order.

@Override
protected void reduce(CompositeDataFeedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 // The hit data stored in 'values' is already sorted. Yay!
 String[] columns;
 String eventList;
 String productList;
 Double revenue = 0.0;

 for(Text hit : values) {
  columns = hit.toString().split("\\t", -1);
  eventList = DataFeedTools.getValue("post_event_list", columns, this.columnHeaders);
  eventList = String.format(",%s,", eventList);
  // Was there a purchase?
  if(eventList.contains(",1,")) {
   productList = DataFeedTools.getValue("post_product_list", columns, this.columnHeaders);
   revenue += DataFeedTools.calculateRevenue(productList);
  }
 }

 // Let's just grab the visId part of the key before we return it. No need to return the
 // composite key that was built by the mapper.
 this.visId.set(key.getVisId());
 this.result.set(revenue);
 context.write(this.visId, this.result);
}

Note how I extract the visitor ID from the key and return that instead of the entire CompositeDataFeedKey.

Summary

By implementing a custom WritableComparable, Partitioner, Grouper, and Comparator class, you can reduce the amount of memory that your reducer uses when processing data feed data and distribute the load from sorting the data across your cluster.

In the repository linked to at the beginning of this post, I’ve included both the old method and this new method so you can benchmark the differences and tinker a bit. In my own testing, I was able to process 50MM hits in 4:27 using the ScalableDataFeedJob and 5:35 using the StandardDataFeedJob.

Please feel free to reach out to me with any questions you have or with any improvements you’ve made to this. I’d love to hear from you!

References

https://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/

This is a concise description of a simple implementation of Secondary Sorting in Hadoop.

Jared Stevens

Jared Stevens is a software engineer at Adobe and works on the Analytics reporting APIs. He has also worked as a Software Engineering consultant at Adobe for 7 years and has assisted many of Adobe's top tier customers with custom integrations and data processing requests. When he's not knee deep in data, he enjoys backpacking, video games, and learning about new things.