In the first part of this series, I have introduced why Hadoop framework could be useful to compute the VAR and analyze intermediate values. In the second part I have described a first implementation. One drawback of this previous implementation is that it does not take advantage of the reduce pattern. I did it by hand. I will now fully use Hadoop reduce feature.
If I simply code a reduce class with the signature public void reduce(DrawKey key, Iterator values, OutputCollector output, Reporter reporter)
, this method will be called one time per key scenario Id;price value
so one time per draw, because each draw generates a different price. The successive calls to the reduce()
method are totally independant (no shared variable). So it is not possible to know if 1% of the the values has been processed, to extract the percentile and thus the VAR in such a way. However, Hadoop provides a way to sort keys one way and group them another way and this feature will help us to solve the problem. The example given uses keys containing two integers and shows how to group by the first integer and sort them by the second one. I am proposing to use it to extract the percentile. I have created a new program for which I won't give you all the code but just a synopsis. This VarSimpleOptionReduce
program defines two jobs:
IntermediateResultSortReducer
that I will describe hereafter. Hadoop will sort the data and this reducer will extract the VAR.The corresponding configuration is the following:
/**
* Configuration adapted for a binary compressed result
* @param args
* @return
*/
private JobConf buildGenerateMapperConfBinaryResult(String[] args) {
JobConf jobConf = new JobConf();
//identical...
jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setNumReduceTasks(0);
//...
return jobConf;
}
/**
* Configuration adapted for a binary compressed result
* @param args
* @return
*/
private JobConf buildGenerateReducerConfBinaryResult(String[] args) {
JobConf jobConf = new JobConf();
//...
jobConf.setNumMapTasks(0);
jobConf.setReducerClass(IntermediateResultSortReducer.class);
jobConf.setPartitionerClass(FirstPartitionner.class);
//Inputs of the reduce tasks are sorted by DrawKey
jobConf.setOutputKeyComparatorClass(DrawKey.Comparator.class);
//All keys equals according to the ScenarioKey.Comparator will be send to the same reduce class
jobConf.setOutputValueGroupingComparator(ScenarioKey.Comparator.class);
jobConf.setNumReduceTasks(1);//Use 2 if you have two scenarios
//...
}
Map output is spread among partitions. The partitionner class runs on the output of the map task before sorting. Then each partition is processed by a different reduce task - a different call to the reduce()
function-. All keys for the same scenario should be sorted together and not in two different reduce tasks. Thus the optimized configuration is to have one partition and one reduce task per scenario.
jobConf.setPartitionerClass(FirstPartitionner.class);
So the FirstPartitionner
should only take into account the scenario key (scenario id and percentile size).
@SuppressWarnings("deprecation")
public class FirstPartitionner implements Partitioner<drawkey , CustomResultWritable> {
@Override
public int getPartition(DrawKey key, CustomResultWritable value, int numPartitions) {
return Math.abs((((ScenarioKey)key).hashCode()) * 127)%numPartitions;
}
@Override
public void configure(JobConf job) {}
}
DrawKey.Comparator.class
and ScenarioKey.Comparator.class
compares instances of DrawKey
and ScenarioKey
. I will give you more details about that in the following paragraph. In this way, the reduce()
method will take this kind of input (syntax with parenthesis is only illustrative):
1;10;0.513775910851316 ( "252 84.31301373924966 120.0 0.05 0.2 0.513775910851316", "1;10;0.513775910851316 252 103.39569385168355 120.0 0.05 0.2 4.181165705822988", "1;10;0.513775910851316 252 123.11293496630553 120.0 0.05 0.2 14.414516512987014")
So reduce()
method will be called one time for the scenario key 1;10
with 3 different price values in the ascending order. One drawback of the reducer signature is that the reducer will be called with a DrawKey
: 1;10;0.513775910851316. The corresponding call price value 0.513775910851316 is the smallest one and should be ignored from a business perspective. The real price values computed after each draw 0.513775910851316, 4.181165705822988, 14.414516512987014 are provided in the values. That's why the call price value is duplicated both in the key (for sorting) and in value (for collecting in reduce()
function).
I used a third and last refinement for my key, putting the percentile size in it. It allows each reduce()
call to get that value in an easy and unique way. The final shape of my key is then scenario id;percentile size;call price
. We can now see the reducer implementation:
@SuppressWarnings("deprecation")
public final class IntermediateResultSortReducer extends MapReduceBase implements Reducer<drawkey , CustomResultWritable, DrawKey, CustomResultWritable> {
/**
* Prerequisite: Key are grouped by ScenarioKey and sorted by DrawKey
* So we have for each ScenarioKey, values sorted by Price
*/
@Override
public void reduce(DrawKey key, Iterator<CustomResultWritable> values,
OutputCollector<drawkey , CustomResultWritable> output, Reporter reporter)
throws IOException {
//Collect only the s
CustomResultWritable previousResult = new CustomResultWritable();
CustomResultWritable currentResult = new CustomResultWritable();
int countDown = key.getPercentileSize().get();
while(countDown >= 0 && values.hasNext()) {
if(countDown == 0) {
output.collect(key, values.next());
reporter.setStatus("Reduce ended");
return;//Exit
}
else {
currentResult = values.next();
System.out.println(currentResult);
if(currentResult.getPrice() < previousResult.getPrice()) {
reporter.setStatus("Data are not sorted");
System.err.println(String.format("Previous price: %s, current price: %d", previousResult.getPrice(), currentResult.getPrice()));
}
}
countDown--;
if(!values.hasNext()) {
String err = String.format("Reducer: Key %s has only %d values with an expected percentile of %d", key.toString(), key.getPercentileSize().get()-countDown, key.getPercentileSize().get());
System.err.println(err);
reporter.setStatus(err);
}
}
}
}
The reduce()
function reads the percentile size - lets name it p
- and collects the p th. values. The output file contains the VAR 0.191 and the corresponding parameters.
1;10000;4.1013116251405945E-10 252 78.12793687367298 120.0 0.05 0.2 0.19109501386036332
So, like in the first implementation, I use efficiently benefit of the sort feature of Hadoop. But furthermore:
SecondarySortPartitionner
allowing me to get all the values for one scenario in the same reduce task. That way, Hadoop can parallelize the reduce phase for each scenario by instantiating several reduce tasks.That concludes the second implementation for VAR calculation on Hadoop and that part of this series. I used other optimizations in my code that I will explain you in the next part. And in the last articles of this series, I will give you some performances figures, and I will conclude why it is in the interest of using Hadoop for VAR calculation.