Dans le premier article de cette série, j'ai introduit pourquoi le framework Hadoop pouvait être utilisé pour calculer la VAR et analyser les valeurs intermédiaires. Dans le second article j'ai décrit une première implémentation. Un inconvénient de cette précédente implémentation est qu'elle ne tire pas pleinement partie du pattern reduce. Je réalise le travail manuellement. Je vais désormais utiliser pleinement la fonctionnalité reduce.
Si je code simplement une classe reduce avec la signature public void reduce(DrawKey key, Iterator values, OutputCollector output, Reporter reporter)
, cette méthode sera appelée une fois par clé scenario Id;price value
une fois par tirage, car chaque tirage génère un prix différent. Les appels successifs à la méthode reduce()
sont totalement indépendants (aucune variable partagée). Ainsi, comme il n'est pas possible de savoir si 1% des valeurs a déjà été traité, on ne peut pas savoir quand extraire le percentile et en déduire la VAR de cette façon. Cependant, Hadoop fournit une façon de trier les clés selon un certain ordre et de les grouper selon un autre critère. Cette fonctionnalité va m'aider à trouver une solution. L'exemple fournit par Hadoop utilise des clés contenant deux entiers et montre comment grouper ces clés par le premier entier et les grouper selon le second. Je vous propose d'utiliser cela pour extraire le percentile. J'ai créé un nouveau programme dont je ne vais pas vous détailler l'intégralité du code mais simplement un résumé. Le programme VarSimpleOptionReduce
définit deux jobs :
IntermediateResultSortReducer
que je vais vous décrire ci-dessous. Hadoop va tirer la donnée et le reducer var extraire la VAR.La configuration correspondante est la suivante:
/**
* Configuration adapted for a binary compressed result
* @param args
* @return
*/
private JobConf buildGenerateMapperConfBinaryResult(String[] args) {
JobConf jobConf = new JobConf();
//identical...
jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setNumReduceTasks(0);
//...
return jobConf;
}
/**
* Configuration adapted for a binary compressed result
* @param args
* @return
*/
private JobConf buildGenerateReducerConfBinaryResult(String[] args) {
JobConf jobConf = new JobConf();
//...
jobConf.setNumMapTasks(0);
jobConf.setReducerClass(IntermediateResultSortReducer.class);
jobConf.setPartitionerClass(FirstPartitionner.class);
//Inputs of the reduce tasks are sorted by DrawKey
jobConf.setOutputKeyComparatorClass(DrawKey.Comparator.class);
//All keys equals according to the ScenarioKey.Comparator will be send to the same reduce class
jobConf.setOutputValueGroupingComparator(ScenarioKey.Comparator.class);
jobConf.setNumReduceTasks(1);//Use 2 if you have two scenarios
//...
}
La sortie de la map est répartie entre différentes partitions. La classe partitionner
s'exécute sur la sortie de la tâche map avant le tri. Ensuite chaque partition est traitée par une tâche reduce différente (c'est-à-dire par un nouvel appel à une fonction reduce()
). Toutes les clés pour le même scénario doivent être triées ensemble et non dans deux tâches séparées. Ainsi, l'optimisation de configuration consiste à avoir une partition et une tâche reduce par scénario.
jobConf.setPartitionerClass(FirstPartitionner.class);
Ainsi le FirstPartitionner
doit prendre en compte uniquement la clé de scénario (scenario key : scenario id et percentile size).
@SuppressWarnings("deprecation")
public class FirstPartitionner implements Partitioner<drawkey , CustomResultWritable> {
@Override
public int getPartition(DrawKey key, CustomResultWritable value, int numPartitions) {
return Math.abs((((ScenarioKey)key).hashCode()) * 127)%numPartitions;
}
@Override
public void configure(JobConf job) {}
}
DrawKey.Comparator.class
et ScenarioKey.Comparator.class
comparent des instances de DrawKey
et ScenarioKey
. Je vous donnerai plus de détails à ce propos dans un prochain paragraphe. De cette façon, la méthode reduce()
prendra ce type d'entrée (la syntaxe avec les parenthèses est purement illustrative) :
1;10;0.513775910851316 ( "252 84.31301373924966 120.0 0.05 0.2 0.513775910851316", "1;10;0.513775910851316 252 103.39569385168355 120.0 0.05 0.2 4.181165705822988", "1;10;0.513775910851316 252 123.11293496630553 120.0 0.05 0.2 14.414516512987014")
Ainsi la méthode reduce()
sera appelée une fois pour chaque clé de scénario 1;10
avec 3 prix différents triés de façon croissante. Un effet de bord de la signature du reducer est qu'il sera appellé avec une clé de tirage DrawKey
: 1;10;0.513775910851316. La valeur du prix du call correspondant 0.513775910851316 est simplement le prix le plus faible et doit tout simplement être ignoré car il n'a pas de sens d'un point de vue métier. Les valeurs réelles des prix calculés après chaque tirage 0.513775910851316, 4.181165705822988, 14.414516512987014 sont fournies dans les valeurs. C'est pourquoi la valeur du prix du call doit être dupliquée à la fois dans la clé (pour le besoin de tri) et dans la valeur (pour collecter les valeurs dans la fonction reduce()
).
J'ai utilisé un troisième et dernier raffinement pour ma clé, en stockant également la taille du percentile à l'intérieur. Cela permet à chaque appel à la fonction reduce()
d'obtenir cette valeur d'une façon simple et centralisée. La forme finale de ma clé est donc scenario id;percentile size;call price
. Nous allons pouvoir regarder l'implémentation du reducer :
@SuppressWarnings("deprecation")
public final class IntermediateResultSortReducer extends MapReduceBase implements Reducer<drawkey , CustomResultWritable, DrawKey, CustomResultWritable> {
/**
* Prerequisite: Key are grouped by ScenarioKey and sorted by DrawKey
* So we have for each ScenarioKey, values sorted by Price
*/
@Override
public void reduce(DrawKey key, Iterator<CustomResultWritable> values,
OutputCollector<drawkey , CustomResultWritable> output, Reporter reporter)
throws IOException {
//Collect only the s
CustomResultWritable previousResult = new CustomResultWritable();
CustomResultWritable currentResult = new CustomResultWritable();
int countDown = key.getPercentileSize().get();
while(countDown >= 0 && values.hasNext()) {
if(countDown == 0) {
output.collect(key, values.next());
reporter.setStatus("Reduce ended");
return;//Exit
}
else {
currentResult = values.next();
System.out.println(currentResult);
if(currentResult.getPrice() < previousResult.getPrice()) {
reporter.setStatus("Data are not sorted");
System.err.println(String.format("Previous price: %s, current price: %d", previousResult.getPrice(), currentResult.getPrice()));
}
}
countDown--;
if(!values.hasNext()) {
String err = String.format("Reducer: Key %s has only %d values with an expected percentile of %d", key.toString(), key.getPercentileSize().get()-countDown, key.getPercentileSize().get());
System.err.println(err);
reporter.setStatus(err);
}
}
}
}
La fonction reduce()
lit la taille du percentile - que je noterai p
- et collecte les p valeurs les plus faibles. Le fichier de sortie contient la VAR 0.191 et les paramètres correspondants.
1;10000;4.1013116251405945E-10 252 78.12793687367298 120.0 0.05 0.2 0.19109501386036332
Ainsi, comme pour la première implémentation, j'ai tiré bénéfice de la fonctionnalité de tri d'Hadoop. Mais en plus :
SecondarySortPartitionner
mer permettant de récupérer toutes les valeurs d'un scénario dans la même tâche reduce. De cette façon, Hadoop peut paralléliser la phase reduce pour chaque scénario en instanciant plusieurs tâches reduceCela conclut la seconde implémentation du calcul de VAR sur Hadoop et cet article de cette série. J'ai utilisé d'autres optimisations dans mon code que je vais vous expliquer dans le prochain article. Enfin dans les derniers articles de la série, je vous donnerai quelques mesures de performances et je conclurai sur l'intérêt d'utiliser Hadoop pour le calcul de la VAR.