previous article I noticed that object instantiation was a time consuming phase). Serialization format is specialized and optimized for Hadoop requirements. For example it provides "out of the box" variable length values with LongWritable
class. In order to preserve the complete structure of my result
with combination of double
and int
fields, I need to define a custom class that implements the Writable
interface in order to fit with the map()
and reduce()
signature. So this CustomResultWritable
implementation shows how to use Hadoop serialization mechanism.
public class CustomResultWritable implements Writable {
private static String SEPARATOR = "\t";
/** Time in days before the maturity */
private int t;
/** Spot (actual price) of the underlying */
private double s0;
/** Strike (target price) of the underlying */
private double k;
/** Risk free interest rate */
private double r;
/** Implicit volatility of the Equity */
private double sigma;
/** Computed prices with those parameters */
private double price;
//Getter and setters
/** Default constructor to allow setting by reference */
public CustomResultWritable() {}
public CustomResultWritable(int t, double s0, double k, double r, double sigma,
double price) {
super();
this.t = t;
this.s0 = s0;
this.k = k;
this.r = r;
this.sigma = sigma;
this.price = price;
}
public void set(int t, double s0, double k, double r, double sigma,
double price) {
this.t = t;
this.s0 = s0;
this.k = k;
this.r = r;
this.sigma = sigma;
this.price = price;
}
@Override
public void readFields(DataInput input) throws IOException {
this.t = input.readInt();
this.s0 = input.readDouble();
this.k = input.readDouble();
this.r = input.readDouble();
this.sigma = input.readDouble();
this.price = input.readDouble();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(this.t);
output.writeDouble(this.s0);
output.writeDouble(this.k);
output.writeDouble(this.r);
output.writeDouble(this.sigma);
output.writeDouble(this.price);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(t).append(SEPARATOR).append(s0).append(SEPARATOR)
.append(k).append(SEPARATOR).append(r).append(SEPARATOR)
.append(sigma).append(SEPARATOR).append(price);
return builder.toString();
}
}
toString()
method provides a means to write it in text form which is required for a text output format. You should notice that Hadoop provides predefined types for encapsulating Java primitives and String
as well as a more generic mechanism with ObjectWritable
that allows to combine them. However, since this mechanism is more generic and quite complex, I wrote directly a custom writable type.
For the comparison purpose, Hadoop allows custom developments to optimize it too. The default implementation for my key class is the following. But just before I show you the code, I need to precise that I have a hierarchy of keys ScenarioKey
extends DrawKey
. I have explained in my second implementation where they are used and why I have put the percentile size in DrawKey
. I give you the ScenarioKey
implementation, the DrawKey
one is very close to it.
public class ScenarioKey implements WritableComparable<scenariokey> {
// For toString() implementation
protected static final String SEPARATOR = ";";
/**
* Identifer the Scenario (ParametersValueGeneratorConfiguration)
*/
private VLongWritable id;
/**
* Size of the percentile = totalNumberOfDraws * varPrecision
*/
private VIntWritable percentileSize;
public ScenarioKey() {
set(new VLongWritable(), new VIntWritable());
}
public ScenarioKey(long id, int percentileSize) {
super();
set(new VLongWritable(id), new VIntWritable(percentileSize));
}
public ScenarioKey(VLongWritable id, VIntWritable percentileSize) {
super();
set(id, percentileSize);
}
public void set(VLongWritable id, VIntWritable percentileSize) {
this.id = id;
this.percentileSize = percentileSize;
}
public VLongWritable getId() {
return id;
}
public VIntWritable getPercentileSize() {
return percentileSize;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result
+ ((percentileSize == null) ? 0 : percentileSize.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ScenarioKey other = (ScenarioKey) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (percentileSize == null) {
if (other.percentileSize != null)
return false;
} else if (!percentileSize.equals(other.percentileSize))
return false;
return true;
}
@Override
public void readFields(DataInput in) throws IOException {
id.readFields(in);
percentileSize.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
id.write(out);
percentileSize.write(out);
}
@Override
public int compareTo(ScenarioKey o) {
return ScenarioKey.compare(this, o);
}
/**
* Compares two scenario keys
* This method is required for being able to compare two subclass
* of Scenario key because cast don't allow to refer to an overriden method
* @param k1
* @param k2
* @return
*/
public static int compare(ScenarioKey k1, ScenarioKey k2) {
if (k1 == null || k2 == null) {
throw new NullPointerException();
}
int cmp = k1.id.compareTo(k2.id);
if (cmp != 0) {
return cmp;
} else {
return k1.percentileSize.compareTo(k2.percentileSize);
}
}
@Override
public String toString() {
return this.id + SEPARATOR + this.percentileSize;
}
}
The writable implementation is close to the CustomResultWritable
with one notable difference: fields are not Java primitives but VLongWritable
. It allows minimizing the quantity of written data. WritableComparable
just requires to implement compareTo()
method. The implementation reads and compares only the required fields to do it.
Because sorting and comparing date is one of the most important task of Hadoop (see the Map/Reduce pattern description in the first article), an optimization possibility is provided by defining a RawComparator
. This class compares directly bytes representation of the objects preventing to deserialize all the data before sorting them. In practice, I deserialize my scenario id first and return immediately if they are different. I implemented it as an internal class of ScenarioKey
.
static final class Comparator extends WritableComparator {
protected Comparator() {
super(ScenarioKey.class, true);
}
@SuppressWarnings("unchecked")
@Override
public final int compare(WritableComparable w1, WritableComparable w2) {
ScenarioKey d1 = (ScenarioKey)w1;
ScenarioKey d2 = (ScenarioKey)w2;
int cmp = ScenarioKey.compare(d1, d2);
return cmp;
}
@Override
public final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
// si = index of first relevant byte
// li = total length of the byte array to compare
long thisId;
long thatId;
try {
thisId = readVLong(b1, s1);
}
catch(IOException ioex) {
throw new IllegalArgumentException("First VLong is invalid", ioex);
}
try {
thatId = readVLong(b2, s2);
}
catch(IOException ioex) {
throw new IllegalArgumentException("Second VLong is invalid", ioex);
}
int cmp = (thisId < thatId ? -1
: (thisId == thatId ? 0 : 1));
if (cmp == 0) {
int idL1 = WritableUtils.decodeVIntSize(b1[s1]);
int idL2 = WritableUtils.decodeVIntSize(b2[s2]);
//PercentileSize
int thisPs = 0;
int thatPs = 0;
try {
thisPs = readVInt(b1, s1+idL1);
}
catch(IOException ioex) {
throw new IllegalArgumentException("First VInt is invalid", ioex);
}
try {
thatPs = readVInt(b2, s2+idL2);
}
catch(IOException ioex) {
throw new IllegalArgumentException("Second VInt is invalid", ioex);
}
cmp = (thisPs < thatPs ? -1
: (thisPs == thatPs ? 0 : 1));
}
return cmp;
}//End compare
}//End Comparator
This comparator is registered through this static block:
static {
WritableComparator.define(ScenarioKey.class, new ScenarioKey.Comparator());
}
Hadoop framework provides a helper class:WritableUtils
to help decode byte representations but the result still remains a very technical code.
That concludes the code of implementations I used with Hadoop. I will give you performance figures showing in the last part of that article. But, before that, I will show how Hadoop can be used for Business Intelligence too in the next part of that series.