précédent article j'avais noté que l'instanciation était une phase très coûteuse). Le format de sérialisation est spécialisé et optimisé pour les besoins d'Hadoop. Il fournit par exemple "out of the box" un stockage de longueur variable avec la classe LongWritable
. De façon à préserver la structure complète de mon résultat (result
) qui est une combinaison de champs double
et int
, j'ai besoin de définir une classe personnalisée qui implémente l'interface Writable
de façon à correspondre aux signatures des méthodes map()
et reduce()
. Cette implémentation CustomResultWritable
montre comment utiliser le mécanisme de sérialisation de Hadoop.
public class CustomResultWritable implements Writable {
private static String SEPARATOR = "\t";
/** Time in days before the maturity */
private int t;
/** Spot (actual price) of the underlying */
private double s0;
/** Strike (target price) of the underlying */
private double k;
/** Risk free interest rate */
private double r;
/** Implicit volatility of the Equity */
private double sigma;
/** Computed prices with those parameters */
private double price;
//Getter and setters
/** Default constructor to allow setting by reference */
public CustomResultWritable() {}
public CustomResultWritable(int t, double s0, double k, double r, double sigma,
double price) {
super();
this.t = t;
this.s0 = s0;
this.k = k;
this.r = r;
this.sigma = sigma;
this.price = price;
}
public void set(int t, double s0, double k, double r, double sigma,
double price) {
this.t = t;
this.s0 = s0;
this.k = k;
this.r = r;
this.sigma = sigma;
this.price = price;
}
@Override
public void readFields(DataInput input) throws IOException {
this.t = input.readInt();
this.s0 = input.readDouble();
this.k = input.readDouble();
this.r = input.readDouble();
this.sigma = input.readDouble();
this.price = input.readDouble();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(this.t);
output.writeDouble(this.s0);
output.writeDouble(this.k);
output.writeDouble(this.r);
output.writeDouble(this.sigma);
output.writeDouble(this.price);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(t).append(SEPARATOR).append(s0).append(SEPARATOR)
.append(k).append(SEPARATOR).append(r).append(SEPARATOR)
.append(sigma).append(SEPARATOR).append(price);
return builder.toString();
}
}
La méthode toString()
fournit un moyen d'écriture au format texte ce qui est nécessaire pour un format de sortie texte (text output format). Vous pouvez noter qu'Hadoop fournit des types prédéfinis pour encapsuler les types Java primitifs, les String
ainsi qu'un mécanisme plus générique ObjectWritable
qui permet de les combiner. Cependant, comme ce mécanisme est plus générique et un peu complexe, j'ai écrit directement un type writable personnalisé.
Au niveau de la comparaison, Hadoop permet également des développements spécifiques pour en améliorer les performances. Je vais donc vous décrire l'implémentation de ma clé. Mais avant de détailler le code, j'ai besoin de préciser que j'ai une hiérarchie de clés : ScenarioKey
extend DrawKey
. J'ai expliqué dans ma seconde implémentation où elles sont utilisées et pourquoi j'ai placé la taille du percentile dans la DrawKey
. Je vous présente l'implémentation de ScenarioKey
celle de DrawKey
est très proche.
public class ScenarioKey implements WritableComparable<scenariokey> {
// For toString() implementation
protected static final String SEPARATOR = ";";
/**
* Identifer the Scenario (ParametersValueGeneratorConfiguration)
*/
private VLongWritable id;
/**
* Size of the percentile = totalNumberOfDraws * varPrecision
*/
private VIntWritable percentileSize;
public ScenarioKey() {
set(new VLongWritable(), new VIntWritable());
}
public ScenarioKey(long id, int percentileSize) {
super();
set(new VLongWritable(id), new VIntWritable(percentileSize));
}
public ScenarioKey(VLongWritable id, VIntWritable percentileSize) {
super();
set(id, percentileSize);
}
public void set(VLongWritable id, VIntWritable percentileSize) {
this.id = id;
this.percentileSize = percentileSize;
}
public VLongWritable getId() {
return id;
}
public VIntWritable getPercentileSize() {
return percentileSize;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result
+ ((percentileSize == null) ? 0 : percentileSize.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ScenarioKey other = (ScenarioKey) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (percentileSize == null) {
if (other.percentileSize != null)
return false;
} else if (!percentileSize.equals(other.percentileSize))
return false;
return true;
}
@Override
public void readFields(DataInput in) throws IOException {
id.readFields(in);
percentileSize.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
id.write(out);
percentileSize.write(out);
}
@Override
public int compareTo(ScenarioKey o) {
return ScenarioKey.compare(this, o);
}
/**
* Compares two scenario keys
* This method is required for being able to compare two subclass
* of Scenario key because cast don't allow to refer to an overriden method
* @param k1
* @param k2
* @return
*/
public static int compare(ScenarioKey k1, ScenarioKey k2) {
if (k1 == null || k2 == null) {
throw new NullPointerException();
}
int cmp = k1.id.compareTo(k2.id);
if (cmp != 0) {
return cmp;
} else {
return k1.percentileSize.compareTo(k2.percentileSize);
}
}
@Override
public String toString() {
return this.id + SEPARATOR + this.percentileSize;
}
}
Cette implémentation de Writable
est très proche de CustomResultWritable
avec une différence notable: les champs ne sont pas des types primitifs Java mais des VLongWritable
. Cela permet de minimiser la quantité de données écrites WritableComparable
nécessite simplement d'implémenter la méthode compareTo()
. L'implémentation lit et compare seulement les champs nécessaires pour ce faire.
Parce que trier et comparer est l'une des tâches les plus importantes d'Hadoop (voir l'explication du pattern Map/Reduce dans le premier article), une possibilité d'optimisation est fournie en définissant un RawComparator
. Cette classe compare directement les octets représentant les objets évitant ainsi de désérialiser toutes les données avant de les trier. En pratique, je commence par désérialiser mon identifiant de scénario et je renvoie la réponse immédiatement s'ils sont différents. Je l'ai implémenté comme une classe interne de ScenarioKey
.
static final class Comparator extends WritableComparator {
protected Comparator() {
super(ScenarioKey.class, true);
}
@SuppressWarnings("unchecked")
@Override
public final int compare(WritableComparable w1, WritableComparable w2) {
ScenarioKey d1 = (ScenarioKey)w1;
ScenarioKey d2 = (ScenarioKey)w2;
int cmp = ScenarioKey.compare(d1, d2);
return cmp;
}
@Override
public final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
// si = index of first relevant byte
// li = total length of the byte array to compare
long thisId;
long thatId;
try {
thisId = readVLong(b1, s1);
}
catch(IOException ioex) {
throw new IllegalArgumentException("First VLong is invalid", ioex);
}
try {
thatId = readVLong(b2, s2);
}
catch(IOException ioex) {
throw new IllegalArgumentException("Second VLong is invalid", ioex);
}
int cmp = (thisId < thatId ? -1
: (thisId == thatId ? 0 : 1));
if (cmp == 0) {
int idL1 = WritableUtils.decodeVIntSize(b1[s1]);
int idL2 = WritableUtils.decodeVIntSize(b2[s2]);
//PercentileSize
int thisPs = 0;
int thatPs = 0;
try {
thisPs = readVInt(b1, s1+idL1);
}
catch(IOException ioex) {
throw new IllegalArgumentException("First VInt is invalid", ioex);
}
try {
thatPs = readVInt(b2, s2+idL2);
}
catch(IOException ioex) {
throw new IllegalArgumentException("Second VInt is invalid", ioex);
}
cmp = (thisPs < thatPs ? -1
: (thisPs == thatPs ? 0 : 1));
}
return cmp;
}//End compare
}//End Comparator
Ce comparateur est enregistré par ce bloc statique :
static {
WritableComparator.define(ScenarioKey.class, new ScenarioKey.Comparator());
}
Le framework Hadoop fournit une classe d'aide : WritableUtils
pour aider à décoder la représentation en octets mais le résultat reste très technique.
Cela conclut le code d'implémentation que j'ai utilisé avec Hadoop. Je vous donnerai des chiffres de performance dans le dernier article de cette série. Mais, avant cela, je vais vous montrer dans le prochain article comment Hadoop peut également être utilisé pour réaliser de l'analyse décisionnelle sur les données intermédiaires.