my previous article. The classes used to compute the VAR are the same: OptionPricer
, Parameters
and ParametersValueGenerator
. The following classes were added for Hadoop purpose: InputParser
(parsing of the input file), ScenarioKey
and DrawKey
, IntermediateResultMapper
, VarSimpleOptionDirectSearch.VarSimpleOptionDriver
was added for configuration purpose and VarSimpleOptionDirectSearch
as the launcher.
IntermediateResultMapper
is an implementation of the map
requirements as described in the first article and is very close to the GridGain implementation:
class IntermediateResultMapper extends MapReduceBase implements
Mapper<longwritable , Text, DrawKey, Text> {
private static Logger log = LoggerFactory.getLogger(IntermediateResultMapper.class);
private enum HadoopCounters {
SKIPPED_LINE,
COMPUTING_ERROR
}
@Override
public void map(final LongWritable key, final Text value,
OutputCollector<drawkey , Text> output,
Reporter reporter) throws IOException {
String line = value.toString();
InputStruct is = InputParser.parse(line);
if (is != null) {
Parameters params = new Parameters(
is.t, is.s0, is.k, is.r, is.sigma);
ParametersValueGenerator valueGenerator = new ParametersValueGenerator(is.historicalVolatility, 0, 0);
OptionPricer optionPricer = new OptionPricer(params);
valueGenerator.initializeReference(params);
valueGenerator.intializeRandomGenerator();
try {
int percentileSize = (int)(is.drawsNb*(1-is.varPrecision));
DrawKey dk = new DrawKey(is.id, percentileSize, 0);
for(int i = 0 ; i < is.jobDrawsNb ; i++) {
final Result result = computePriceScenario(optionPricer, valueGenerator);
dk.setValue(result.getPrice());//Optimization: set by reference to avoid object creation
output.collect(dk, new Text(result.toString()));
}
} catch (Exception e) {
reporter.incrCounter(HadoopCounters.COMPUTING_ERROR, 1);
log.error("Error computing var", e);
}
} else {
reporter.incrCounter(HadoopCounters.SKIPPED_LINE, 1);
}
}
/**
* Compute the price of the call based on a scenario
* generated by a draw
*/
private Result computePriceScenario(final OptionPricer optionPricer,
final ParametersValueGenerator valueGenerator) throws MathException {
valueGenerator.setNewGeneratedParameters(optionPricer.getParameters());
final double price = optionPricer.computeCall();
return new Result(optionPricer.getParameters(),price);
}
}
One key point is the signature of that method: public void map(final LongWritable key, final Text value, OutputCollector output, Reporter reporter)
. The input file is read as a (LongWritable, Text)
pairs. With a simple file, the key (LongWritable
) is the byte offset of the beginning the line and the value the content of the line. For example:
0 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
45 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
...
The second line begins at character 45. This byte offset provides easily a unique key. It is ignored in my implementation. The map produces (DrawKey, Text)
pairs as described below.
1;10;0.513775910851316 252 84.31301373924966 120.0 0.05 0.2 0.513775910851316
...
The key contains the scenario id, the total number of draws and the price value. The values contain the price and the parameters. I will explain a bit later why this choice was important for a further optimization. Thus, all my draws are computed and generated like in the GridGain implementation but directly stored in a file system. So it allows two things
I will describe these two points.
Output of the map
task could be considered like the job results in GridGain implementation (see my second article). I could have used exactly the same algorithm with an in memory sort. However it won't have taken benefit from Hadoop map/reduce implementation.
Hadoop map/reduce implementation is based on the sorting keys. Map tasks produce files.
key1 valueA
key2 valueB
key1 valueC
...
These files are sorted by key. Then all the lines with the same key are combined in a pair (key, list of values)
form.
key1 (valueA, valueC) key2 (valueB)
Then reduce tasks are fed with such values. The signature of the corresponding reduce function is reduce(DrawKey k, Iterator values, OutputCollector, Reporter reporter)
. So sorting by key is the heart of Hadoop map/reduce implementation because it allows transforming map phase output into reduce phase input. I will not immediately give you "the" reduce implementation because several choices can be made. Before that, I will discuss the determination of the key which is a fundamental point in the definition of the Hadoop implementation.
To perform VAR calculation, I need to have the list of all computed prices, sort them and extract the percentile: the 10% lowest values. Finally I have to identify the highest value of that percentile (see my first article for more details). Processing of a map/reduce job by Hadoop can be applyied simultaneously to several sets of data. In order to provide one result for each set, Hadoop processing follows these steps:
map()
function to all the values;reduce()
functionFor my implementation, I will take benefit from the sort provided by Hadoop process. Indeed, by using a simple Treeset, like in the GridGain implementation, I'm limited by the Java heap size. By contrast, Hadoop provides a finely tuned mixed in memory and on disk sort algorithm. To give a clue of Hadoop performance on sorting dataset, I refer to the Terabyte sort: a benchmark consisting in sorting 1 TB of data and which Hadoop has won.
main
programSo, in order to use Hadoop sorting capability, I use DrawKey
containing scenario id;call price
value. That way, the input of the reduce task contains for each successive scenario the sorted prices of the job. At the end of the Hadoop job, I read the sorted results file through the distributed file system API. I iterate through the 1% lowest value and identify the VAR. It is my first implementation. It is configured in Hadoop through the following code:
@SuppressWarnings("deprecation")
public class VarSimpleOptionDirectSearch {
static class VarSimpleOptionDriver extends Configured implements Tool {
private static Log log = LogFactory.getLog(VarSimpleOptionDriver.class);
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
log.error(String.format(
"Usage %s [generic options] <input /> <output>\n",
getClass().getSimpleName()));
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
// Generate the prices for each draw and sort them
JobConf generateMapConf = buildGenerateMapperConf(args);
double startTime = System.nanoTime();
JobClient.runJob(generateMapConf);
Path outputPath = new Path(args[1]);
SequenceFile.Reader[] readers = org.apache.hadoop.mapred.SequenceFileOutputFormat
.getReaders(generateMapConf, outputPath);
long id;
int percentileSize;
for (SequenceFile.Reader reader : readers) {
try {
DrawKey key = new DrawKey();
Writable value = new CustomResultWritable();
reader.next(key, value);
do {
id = key.getId().get();
percentileSize = key.getPercentileSize().get();
int cpt = 0; // Count number of draws
log.info(key.toString() + " cpt:" + cpt);
do { // For each value
if (cpt == percentileSize) {
log.info("VAR: cpt" + cpt + "\t"
+ key.toString() + "\t"
+ value.toString());
} else {
//Continue silently
}
cpt++;
} while (reader.next(key, value)
&& id == key.getId().get()
&& percentileSize == key
.getPercentileSize().get());
} while(reader.next(key, value)); // End for each (id, percentileSize)
} finally {
reader.close();
}
}
double computeTime = System.nanoTime() - startTime;
log.info("ComputeTime " + computeTime/1000000 + " (ms.)");
return 0;
}
private JobConf buildGenerateMapperConf(String[] args) {
JobConf jobConf = new JobConf();
//org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders()
//tries to read the _logs directory as part of the SequenceFile
//which leads to an error
//Putting the history in another folder bypasses the problem
jobConf.set("hadoop.job.history.user.location", "job_history");
jobConf.setJarByClass(VarSimpleOptionDirectSearch.class);
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
// Compute intensive task: each mapper will receive a line
jobConf.setInputFormat(NLineInputFormat.class);
jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setNumReduceTasks(1);
// Set no limit to the number of task per JVM in order to take
// advantage of HotSpot runtime optimizations after long runs
jobConf.setNumTasksToExecutePerJvm(-1);
jobConf.setOutputKeyClass(DrawKey.class);
jobConf.setOutputValueClass(CustomResultWritable.class);
jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
return jobConf;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new VarSimpleOptionDriver(), args);
System.exit(exitCode);
}
}// End VarSimpleOption class
Please note that I use Hadoop 0.20.2 with the old style API. Hadoop is launched by the main
method. A Configured
based class is provided to help launching the job. The run()
method configures the job, launches it and iterates the results in order to get the VAR value. I will give some explanations about the jobs configuration.
jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(DrawKey.class);
jobConf.setOutputValueClass(CustomResultWritable.class);
Map and reduce tasks are configured that way. I use an IdentityReducer
that just copies the map output after sorting and so avoids the reduce phase. Output key and value classes have to be defined too. JVM should be informed that the received values are not the default LongWritable
and Text
ones. Due to generic type erasure, the type of the map output cannot be inferred at runtime without such configuration. jobConf.setInputFormat(NLineInputFormat.class);
configures how to split the input file between tasks. I chose NLineInputFormat
in order to launch one map task for each line. By configuring the tasktracker to launch as many tasks as available cores on the node, I optimize the compute intensive map tasks by executing one map task per core. As you might have noticed the input format line 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
contains both
1000
: the total number of draws250
: the number of draws for that taskThat way, I can define input files so that one draw is split between several map tasks. In the following example one draw is divided into 4 tasks.
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
Each map task can generate the corresponding number of draws as provided in the input line. And the total number of draws (here 1000) is used to define the size of the percentile at 1% (here 0.01*1000=10) jobConf.setNumReduceTasks(1);
forces having all results sorted together in one single file and not in several files sorted separately. It is required in order to be able to identify the percentile. As for GridGain implementation, it implies that for one job the reduce phase is not distributed. jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
configures a binary output format to reduce the size.
That concludes the most important configuration properties for the job and the first implementation. That first implementation is still improvable and that will be the subject of the next part of this series.