Dans la partie précédente de cet article, nous avons présenté la solution "web messaging" Diffusion de Push Technology, et comment celle-ci se proposait de régler la question du push de messages vers des clients web.
Afin de tester ses possibilités, nous avons réalisé un « proof of concept ». Ce POC vise à agréger des informations de positionnement provenant de véhicules (latitude, longitude, vitesse) au travers d’un service web. Ces informations sont ensuite restituées en "temps réel" sur un navigateur, via un fond de carte Google Maps.
Avant d'aborder les détails techniques concernant l'implémentation de Diffusion, voici ce que nous avons pu obtenir au travers d'une vidéo...
Le schéma suivant présente l’architecture visée :
Cette architecture se compose :
Le TopicMessage (ie. le message envoyé au navigateur), sera de la forme suivante :
Afin de réaliser cette chaine, Il a été nécessaire d'implémenter deux classes Diffusion, l'EventPublisher et l'InternalPublisher.
L'EventPublisher a à sa charge la diffusion d'un message sur un Topic. Il doit implémenter l’interface EventPublisherListener présente dans l’API Java Diffusion. Cette interface offre des méthodes permettant d’échanger directement avec le serveur Diffusion.
Pour fonctionner, cet EventPublisher doit en premier lieu se connecter au serveur Diffusion lors de son instanciation. Une connexion s’obtient grâce à l’objet EventPublisherConnection.
import com.pushtechnology.diffusion.api.evpub.*;
import com.pushtechnology.diffusion.api.message.*;
public class EventPublisher implements EventPublisherListener {
public EventPublisher() throws APIException {
connection = new EventPublisherConnection("localhost", 3098, this);
connection.getServerDetails().setOutputBufferSize(4096);
connection.connect();
}
public void sendMessage(SensorMessage sensorMessage) {
TopicMessage topicMessage = connection.createDeltaMessage(TOPIC);
populateTopicMessage(topicMessage, sensorMessage);
connection.send(topicMessage);
}
}
La méthode publique sendMessage() permet d'envoyer un message vers l'InternalPublisher (voir ci-après) pour être finalement diffusé auprès de tous les clients web.
Remarque : la classe SensorMessage est un POJO contenant les données qui seront transmises à tous les clients abonnés au Topic.
L'InternalPublisher a à sa charge la diffusion d'un message auprès de tous les clients web connectés. Le TopicMessage, envoyé précédemment, est finalement reçu par l’InternalPublisher en charge du Topic. Cette InternalPublisher a été préalablement déployé sur le serveur Diffusion (voir ci-après). Il doit étendre la classe Publisher.
La première action est de récupérer une ou plusieurs références de Topic dont notre Publisher aura la charge. Ceci est réalisé dans la méthode initialLoad(), méthode appelée par le serveur Diffusion au chargement du Publisher.
Il est possible d’implémenter quelques méthodes bien utiles, comme par exemple messageFromEventPublisher(). Elle est appelée lorsqu’un nouveau message et publié par un EventPublisher. Une façon simple de traiter un tel message peut être de le forwarder directement vers les clients : c'est ce qui est fait ici.
La méthode subscription() est appelée lorsqu’un nouveau client se connecte. Il est alors possible de lui envoyer un message de chargement de contexte (encore appelé LoadMessage).
import com.pushtechnology.diffusion.api.message.*;
import com.pushtechnology.diffusion.api.publisher.*;
import com.pushtechnology.diffusion.api.threads.*;
import com.pushtechnology.diffusion.api.topic.*;
public class RTFPublisher extends Publisher {
@Override
protected void initialLoad()
throws APIException {
topic = getTopic(TOPIC);
if (topic == null) topic = addTopic(TOPIC);
}
@Override
protected void publisherStopping()
throws APIException { ... }
@Override
protected void messageFromEventPublisher(EventConnection eventConnection, TopicMessage eventMessage)
throws APIException {
topic.publishMessage(clientMessage);
}
@Override
protected void subscription(Client client, Topic topic, boolean loaded)
throws APIException {
TopicMessage message = topic.createLoadMessage();
message.putRecords(records.toArray(new Record[0]));
client.send(message);
}
}
Afin de déployer l’InternalPublisher au sein du serveur Diffusion, nous devons tout d'abord packager les différentes classes Java au sein d'un Jar. L’emplacement de ce Jar doit ensuite être déclaré dans le fichier de configuration Diffusion "etc/diffusion.properties", via attribut "usr.lib".
# Class loader section
usr.lib=../user
L’InternalPublisher doit lui aussi être déclaré, dans le fichier de configuration "etc/publishers.properties" cette fois.
publishers=RTF
# RTF Publisher
publisher.RTF.name=RealTimeFlow
publisher.RTF.topics=RTFTopic
publisher.RTF.class=com.octo.rtf.publisher.RTFPublisher
Il faut alors (re)lancer le serveur Diffusion pour prendre en compte les modifications.
Les logs écrits au lancement doivent notifier le lancement du Publisher et éventuellement la création de Topics :
...
2011-05-23 17:39:28.878 : Diffusion : INFO : Starting Publisher 'RealTimeFlow'
2011-05-23 17:39:28.878 : Diffusion : INFO : Registered Topic 'RTFTopic'
2011-05-23 17:39:28.881 : Diffusion : INFO : Started Publisher 'RealTimeFlow'
...
Le client web est une simple page HTML embarquant un peu de traitement JavaScript. Les APIs jQuery, Google Maps et Diffusion sont utilisées afin de restituer l’ensemble des informations sur fond de carte.
Afin de récupérer les messages poussés depuis le Publisher Diffusion, il est nécessaire d'utiliser l’API JavaScript fournie. Le client doit avant tout se connecter au serveur via l’objet DiffusionClient :
DiffusionClient.connect( {
debug : debugValue,
topic: "RTFTopic",
onDataFunction: onDataEvent
} );
L'API se charge ensuite d'initier et de maintenir une connexion, via Comet (XmlHttpRequest, Flash socket ou Silverlight socket) ou WebSocket.
Lorsqu’un message est publié sur un Topic puis reçu par le client, la fonction javascript spécifiée dans le champ onDataFunction (ici, onDataEvent) est appelée, avec en paramètre un objet WebClientMessage. Il est possible de connaitre la nature de ce message (LoadMessage / DeltaMessage) en utilisant la méthode isInitialTopicLoad() de cet objet.
On peut alors parcourir l’ensemble des records et des fields composant le message grâce, par exemple, à une boucle for :
for (var i = 0; i < message.getNumberOfRecords(); i++) {
var record = message.getFields(i);
processRecord(record);
}
La fonction processRecord, permettant de récupérer les informations d'un WebClientMessage, ressemble à ceci :
function processRecord(record) {
if (record[0] == "_a" || record[0] == "_u") {
// It's a ADD or UPDATE message
updateSymbol(record[1], record[2], record[3]);
} else if (record[0] == "_d") {
// It's a DELETE message
removeSymbol(record[1]);
}
}
Reste ensuite à afficher ces informations sur un fond de carte. C'est le rôle des fonctions updateSymbol et removeSymbol. Elles manipulent les éléments affichés sur le fond de carte Google Maps.
function updateSymbol(id, lat, lng) {
var marker;
lat *= Math.pow(10,-6);
lng *= Math.pow(10,-6);
if ((marker = mapSymbols[id]) == null) {
marker = new google.maps.Marker({
position: new google.maps.LatLng(lat, lng),
title: id,
map: map
});
mapSymbols[id] = marker;
} else {
marker.setPosition(new google.maps.LatLng(lat, lng));
}
}
function removeSymbol(id) {
mapSymbols[id].setMap(null);
mapSymbols[id] = null;
}
Les besoins fonctionnels peuvent nécessiter plus qu’un simple broadcast de message auprès de clients web. La user story que nous avons présentée au début de cet article, le suivi de véhicules en ligne, peut être très largement enrichie :
Comme nous avons pu le voir précédemment, un nouveau client peut se voir transmettre un contexte lorsqu’il souscrit à un Topic grâce à un LoadMessage. Dans notre cas, ce LoadMessage contient la position de tous les véhicules en ligne.
Il faut donc intégrer dans la chaine un mécanisme de rétention des messages. Ce mécanisme est à implémenter, car non pris en charge par Diffusion. Il implique deux critères :
Plusieurs implémentations sont alors envisageables. Une première approche consisterait à réaliser cette rétention en Java, au niveau du Publisher, en utilisant par exemple une ConcurrentHashMap et un Thread "nettoyeur".
Une ConcurrentHashMap permet de stocker les états (la clef identifiant de manière unique chaque capteur, la valeur contenant le dernier message envoyé). Elle est alimentée par le Publisher lorsque de nouveaux messages sont reçus par le service web.
Un thread, exécuté au sein d'un pool dédié, scanne à intervalle régulier la précédente map en quête de messages expirés, qu’elle retire le cas échéant. Le retrait d’un message est notifié auprès de l’ensemble des clients.
Cette approche peut être réalisée, par exemple, de la façon suivante :
Tout d'abord, réalisons un CleanerThread (classe implémentant Runnable), bouclant infiniment sur le bout de code suivant :
List records = new ArrayList();
for (String key : sensors.keySet()) {
if(sensors.get(key).isOutdated()) {
sensors.remove(key);
Record record = new Record(RTFPublisher.ACTION_DELETE, key);
records.add(record);
}
}
notifyClients(records);
Thread.sleep(sleepTime);
Ce CleanerThread est ensuite exécuté depuis le Publisher, lors de son initialisation (méthode initialLoad()) :
sensors = new ConcurrentHashMap();
cleaner = new CleanerThread(sensors);
cleaner.setSleepTime(MESSAGE_TIMEOUT);
cleaner.setTopic(topic);
ThreadService.getBackgroundThreadPool().execute(cleaner);
Notez que l’on utilise ici un pool de Threads fournit par Diffusion (le BackgroundThreadPool), qui peut être assimilé à un Executor Java.
Une seconde approche, consiste à déléguer la rétention à un outil de stockage externe à la JVM, comme par exemple Redis.
Redis est un outil de stockage clef/valeur in-memory encore appelé memcache. Il a été spécialement conçu pour agir comme un cache et supporter une charge importante à la fois en lecture et en écriture. Parmi les nombreuses fonctionnalités, on retrouve notamment la gestion du Time To Live (TTL), permettant de gérer l’expiration de nos messages.
Redis possède des clients dans différents langages, comme par exemple Jedis pour Java. Ce client fourni également un pool de Thread permettant d'utiliser Redis dans un environnement concurrent. De par sa conception, le stockage et la récupération de l’état des véhicules ne constituent donc plus un goulot d’étranglement.
Pour en savoir plus à propos de l'utilisation de Redis/Jedis, vous pouvez consulter cet article.
Un message « broadcasté » n’est pas forcément utile à tous les clients.
Dans notre cas, la carte sur laquelle apparaissent les capteurs peut être manipulée par l’observateur (zoom, déplacement). Est-il alors utile de communiquer les messages concernant des véhicules situés en dehors du champ de vision de la carte ?
Le fait de traiter en amont cette situation peut permettre de réduire sensiblement la bande passante nécessaire entre le serveur de push et l’ensemble des clients.
Une solution à ce problème est d’affecter un filtre pour chaque client au niveau du Publisher. Cela permettrait de diffuser, au cas par cas, des messages pertinents.
Dans notre exemple, lorsque la zone observée est modifiée côté client, le serveur doit en être notifié. Il diffusera alors, pour ce client, uniquement les messages situés dans sa zone d’observation. On pourrait dans notre cas utiliser une base de données géographique comme pincaster pour sélectionner la liste des points à diffuser au client.
Le concept de filtre par client n’est pas implémenté sur Diffusion. On peut toutefois imaginer conserver une référence de chaque client ayant souscrit, puis appliquer un filtre à la main sur chacun d'eux avant l’envoie d’un message.
La solution Diffusion proposée par PushTechnology est intéressante en ce sens où elle propose un serveur et des API simples, performantes et compatibles pour faire du web messaging.
Ce POC a d'ailleurs été l'occasion de constater qu'il était relativement aisé de mettre en œuvre des mécanismes simples. L'existence de mécanismes plus avancés au sein de Diffusion, comme la gestion de contextes ou encore le traitement complexe de files, permettent d’imaginer des cas plus riches que celui que nous avons pu voir ici.
Toutefois, Diffusion n'est pas le seul à proposer une solution de push web. Les technologies Comet ne sont pas nouvelles et WebSocket suscite de plus en plus d'intérêt :