précédant article. Les classes utilisées pour calculer la VAR sont les mêmes : OptionPricer
, Parameters
et ParametersValueGenerator
. Les classes suivantes ont été ajoutées pour les besoins de Hadoop : InputParser
(analyse (parsing) du fichier d'entrée), ScenarioKey
et DrawKey
, IntermediateResultMapper
, VarSimpleOptionDirectSearch.VarSimpleOptionDriver
a été introduite dans un but de configuration et VarSimpleOptionDirectSearch
comme lanceur.
IntermediateResultMapper
est l'implémentation de la fonction map
comme décrit dans le tout premier article. Elle est très proche de l'implémentation pour GridGain:
class IntermediateResultMapper extends MapReduceBase implements
Mapper<longwritable , Text, DrawKey, Text> {
private static Logger log = LoggerFactory.getLogger(IntermediateResultMapper.class);
private enum HadoopCounters {
SKIPPED_LINE,
COMPUTING_ERROR
}
@Override
public void map(final LongWritable key, final Text value,
OutputCollector<drawkey , Text> output,
Reporter reporter) throws IOException {
String line = value.toString();
InputStruct is = InputParser.parse(line);
if (is != null) {
Parameters params = new Parameters(
is.t, is.s0, is.k, is.r, is.sigma);
ParametersValueGenerator valueGenerator = new ParametersValueGenerator(is.historicalVolatility, 0, 0);
OptionPricer optionPricer = new OptionPricer(params);
valueGenerator.initializeReference(params);
valueGenerator.intializeRandomGenerator();
try {
int percentileSize = (int)(is.drawsNb*(1-is.varPrecision));
DrawKey dk = new DrawKey(is.id, percentileSize, 0);
for(int i = 0 ; i < is.jobDrawsNb ; i++) {
final Result result = computePriceScenario(optionPricer, valueGenerator);
dk.setValue(result.getPrice());//Optimization: set by reference to avoid object creation
output.collect(dk, new Text(result.toString()));
}
} catch (Exception e) {
reporter.incrCounter(HadoopCounters.COMPUTING_ERROR, 1);
log.error("Error computing var", e);
}
} else {
reporter.incrCounter(HadoopCounters.SKIPPED_LINE, 1);
}
}
/**
* Compute the price of the call based on a scenario
* generated by a draw
*/
private Result computePriceScenario(final OptionPricer optionPricer,
final ParametersValueGenerator valueGenerator) throws MathException {
valueGenerator.setNewGeneratedParameters(optionPricer.getParameters());
final double price = optionPricer.computeCall();
return new Result(optionPricer.getParameters(),price);
}
}
Un point notable est la signature de la méthode : public void map(final LongWritable key, final Text value, OutputCollector output, Reporter reporter)
. Le fichier d'entrée est lu comme une paire (LongWritable, Text)
. Avec un fichier standard, la clé (LongWritable
) est l'offset du début de la ligne et la valeur correspond au contenu de la ligne. Par exemple:
0 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
45 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
...
La seconde ligne débute au caractère 45. Cet offset fournit simplement une clé unique. Cette clé est tout simplement ignorée dans cette implémentation. La fonction map produit des paires (DrawKey, Text)
comme illustré ci-dessous.
1;10;0.513775910851316 252 84.31301373924966 120.0 0.05 0.2 0.513775910851316
...
La clé contient l'identifiant du scénario, le nombre total de tirages et le prix. Les valeurs contiennent le prix et les paramètres. Je vais expliquer un peu plus loin en quoi ce choix était important pour les optimisations à venir. Ainsi, tous mes tirages sont calculés et générés comme dans l'implémentation GridGain mais stockés directement dans un système de fichiers. Cela permet deux choses:
Je décrirai ces deux points.
La sortie de la tâche map
peut être considérée comme les job results de GridGain (cf. mon second article). J'aurais pu utiliser exactement le même algorithme avec un tri en mémoire. Cependant, je n'aurai pas tiré parti de l'implémentation map/reduce fournie par Hadoop.
L'implémentation map/reduce d'Hadoop est basée sur le tri des clés. Les tâches Map produisent des fichiers.
key1 valueA
key2 valueB
key1 valueC
...
Ces fichiers sont triés par clé. Ensuite, toutes les lignes avec la même clé sont combinées sous forme de paires (key, list of values)
.
key1 (valueA, valueC) key2 (valueB)
Ensuite, les tâches reduce sont alimentées avec de telles valeurs. La signature de la fonction reduce est reduce(DrawKey k, Iterator values, OutputCollector, Reporter reporter)
. Ainsi, le tri par clé est le coeur de l'implémentation map/reduce de Hadoop car cela permet de transformer les sorties de la phase map en entrées de la phase reduce. Je ne vais pas vous proposer immédiatement "l'implémentation" de reduce car plusieurs choix sont possibles. Avant cela, je vais discuter le choix de la clé qui est un point fondamental pour l'implémentation d'Hadoop.
Pour effectuer le calcul de la VAR, j'ai besoin d'avoir la liste de tous les prix calculés, de les trier et d'extraire le percentile : les 10% de valeurs les moins élevées. Enfin, je dois identifier la plus haute valeur du percentile (voir mon premier article pour plus de détails). Le traitement des travaux de map/reduce par Hadoop peut être réalisé simultanément sur plusieurs jeux de données. De façon à fournir un résultat unique pour chaque ensemble, Hadoop fonctionne selon les étapes suivantes:
map()
à toutes les valeursreduce()
Pour mon implémentation, je vais tirer parti du tri fourni par le mécanisme d'Hadoop. En effet, en utilisant un simple Treeset, comme dans l'implémentation avec GridGain, je suis limité par la taille de la Heap Java. Au contraire, Hadoop fournit un algorithme de tri optimisé réparti entre le disque et la mémoire. Pour vous donner un aperçu des performances d'Hadoop dans ce domaine, je ferai référence au Terabyte sort : un benchmark qui consiste à trier 1 TB de données et qu'Hadoop a gagné.
main
Ainsi, de façon à utiliser la capacité de tri de Hadoop, j'utilise la clé DrawKey
contenant scenario id;call price
. De cette façon, l'entrée de la tâche reduce contient pour chaque scénario successif les prix correspondants. A la fin du job Hadoop, je lis le fichier contenant les résultats triés à travers l'API du système de fichiers distribué. Je parcours les 1% de valeurs les plus faibles et identifie ainsi la VAR. Cela constitue ma première implémentation. Elle est définie dans Hadoop grâce au code suivant :
@SuppressWarnings("deprecation")
public class VarSimpleOptionDirectSearch {
static class VarSimpleOptionDriver extends Configured implements Tool {
private static Log log = LogFactory.getLog(VarSimpleOptionDriver.class);
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
log.error(String.format(
"Usage %s [generic options] <input /> <output>\n",
getClass().getSimpleName()));
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
// Generate the prices for each draw and sort them
JobConf generateMapConf = buildGenerateMapperConf(args);
double startTime = System.nanoTime();
JobClient.runJob(generateMapConf);
Path outputPath = new Path(args[1]);
SequenceFile.Reader[] readers = org.apache.hadoop.mapred.SequenceFileOutputFormat
.getReaders(generateMapConf, outputPath);
long id;
int percentileSize;
for (SequenceFile.Reader reader : readers) {
try {
DrawKey key = new DrawKey();
Writable value = new CustomResultWritable();
reader.next(key, value);
do {
id = key.getId().get();
percentileSize = key.getPercentileSize().get();
int cpt = 0; // Count number of draws
log.info(key.toString() + " cpt:" + cpt);
do { // For each value
if (cpt == percentileSize) {
log.info("VAR: cpt" + cpt + "\t"
+ key.toString() + "\t"
+ value.toString());
} else {
//Continue silently
}
cpt++;
} while (reader.next(key, value)
&& id == key.getId().get()
&& percentileSize == key
.getPercentileSize().get());
} while(reader.next(key, value)); // End for each (id, percentileSize)
} finally {
reader.close();
}
}
double computeTime = System.nanoTime() - startTime;
log.info("ComputeTime " + computeTime/1000000 + " (ms.)");
return 0;
}
private JobConf buildGenerateMapperConf(String[] args) {
JobConf jobConf = new JobConf();
//org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders()
//tries to read the _logs directory as part of the SequenceFile
//which leads to an error
//Putting the history in another folder bypasses the problem
jobConf.set("hadoop.job.history.user.location", "job_history");
jobConf.setJarByClass(VarSimpleOptionDirectSearch.class);
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
// Compute intensive task: each mapper will receive a line
jobConf.setInputFormat(NLineInputFormat.class);
jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setNumReduceTasks(1);
// Set no limit to the number of task per JVM in order to take
// advantage of HotSpot runtime optimizations after long runs
jobConf.setNumTasksToExecutePerJvm(-1);
jobConf.setOutputKeyClass(DrawKey.class);
jobConf.setOutputValueClass(CustomResultWritable.class);
jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
return jobConf;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new VarSimpleOptionDriver(), args);
System.exit(exitCode);
}
}// End VarSimpleOption class
Notez que j'ai utilisé Hadoop 0.20.2 mais avec l'ancienne API. Hadoop est lancé par la méthode main
. Une classe mère configured
est fourni pour aider à lancer le job. La méthode run()
le lance puis parcourt les résultats de façon à extraire la VAR. Je vais maintenant donner quelques explications sur la configuration.
jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(DrawKey.class);
jobConf.setOutputValueClass(CustomResultWritable.class);
Les tâches map et reduce sont configurées de cette façon. J'utilise un reducer idempotent IdentityReducer
qui copie simplement la sortie de la map après avoir trié les valeurs et ainsi évite la phase de reduce. Les classes de la clé et de la valeur de sortie doivent également être définies. La JVM doit être informée que les valeurs reçues ne sont pas les LongWritable
et Text
par défaut. Du fait du type erasure (effacement des générics au runtime), le type de la sortie de la map ne peut pas être déterminé au runtime sans une telle configuration. jobConf.setInputFormat(NLineInputFormat.class);
configure comment découper le fichier d'entrée en différentes tâches. J'ai choisi d'utiliser la classe NLineInputFormat
de façon à lancer une tâche map par ligne. En configurant le processus hébergeant les tâches (le tasktracker) pour lancer autant de tâches que de coeurs disponibles sur le noeud, j'optimise les tâches map intenses en calcul en en exécutant simultanément une sur chaque core. Comme vous l'avez peut être noté, le format de la ligne en entrée 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
contient à la fois :
1000
: le nombre total de tirages250
: le nombre de tirages à réaliser par cette tâcheDe cette façon, je peux définir les fichiers d'entrée de telle façon qu'un tirage soit réparti entre plusieurs tâches. Dans l'exemple suivant un tirage est divisé en 4 tâches.
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
Chaque tâche map peut générer le nombre correspondant de tirages comme indiqué dans la ligne d'entrée. Puis le nombre total de tirages (ici 1000) est utilisé pour définir la taille du percentile à 1% (ici 0,01*1000=10). jobConf.setNumReduceTasks(1);
force tous les résultats à être triés dans un fichier unique et non dans plusieurs fichiers triés séparément. Cela est nécessaire de façon à pouvoir identifier le percentile. Comme pour notre implémentation GridGain, cela implique que pour un job la tâche de reduce n'est pas distribuée. jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
configure enfin un format de sortie binaire afin de minimiser la taille.
Cela conclut les points importants de la configuration pour le job et la première implémentation. Cette-dernière est améliorable et cela sera le sujet du prochain article de cette série.