article expliquant les concepts des CEP a été publié par Karim Ben Othman. Nicolas Salmon a ensuite décrit une implémentation CEP avec le framework open source Esper.
Je trouve donc intéressant de vous présenter dans cet article CEP à travers la solution Open Source GlassFish ESB. Nous utiliserons ici le même cas d’utilisation que celui utilisé pour Esper.
Rappelons notre scénario: en partant du postulat que chaque bagage, une fois enregistré, porte une puce RFID que des antennes sont capables de localiser. Les antennes transmettent les messages à une application « centrale » avec les bagages qu’elles ont identifiés dans leur périmètre.
Le schéma ci-dessus illustre le système mis en place.
Pour détecter les problèmes nous considérons que le temps maximum d’un « voyage » sur le tapis est de n secondes, et, qu’au delà de ce délai, le bagage doit être considéré comme manquant.
La méthode consiste donc à vérifier qu’un même bagage est passé par les 4 antennes dans l’intervalle de temps attendu et donc, qu’il existe 4 évènements détectés pour un même bagage dans les n dernières secondes.
GlassFish ESB est un ESB qui intègre un ensemble de « briques » dont notamment le serveur d’application Sun GlassFish, un bus JMS, JavaMQ et l’IDE NetBeans.
Cette solution respecte les standards JBI. Elle se compose de modules de transformation (des Service Engines) et des adaptateurs (des Binding Components).
IEP, pour Intelligent Event Processor, est un Service Engine. Il propose donc un ensemble de composants permettant d'implémenter le concept CEP dans GlassFish ESB.
Un composant dans IEP est une requête effectuée sur une table en entrée et dont la réponse est stockée dans une table en sortie. Le composant présente visuellement la requête et contient des champs à compléter. Les variables en entrée sont disponibles et il ne reste plus qu’à les glisser/déposer dans le corps de la requête.
Le langage de requêtage utilisé est du CQL (pour Continuous Query Language).
C’est ici que GlassFish ESB nous montre sa simplicité d’utilisation : un ensemble de composants IEP est proposé et il ne nous reste plus « qu’à » sélectionner et paramétrer les composants nécessaires.
Pour notre cas d’utilisation, nous allons développer deux services :
Le premier devra simuler le flux de bagages passant devant les antennes
Le deuxième sera chargé de détecter les bagages perdus
Nous ne nous attacherons pas à détailler le premier service, car ce n’est pas ici le but de notre article. Il est cependant utile d’indiquer que c’est un simple programme écrit en Java qui envoie des requêtes SOAP en http. Les messages qui sont envoyés, correspondent à un instant donné à tous les bagages détectés par une antenne.
Un évènement est un couple : <ID du bagage, ID de l’antenne>.
Intéressons-nous maintenant au vrai sujet de cet article : le service se basant sur la brique IEP et détectant les bagages perdus. Notre service doit donc effectuer 3 actions :
Récupérer tous les messages provenant des antennes
Détecter les bagages manquant immédiatement. C’est-à-dire un bagage effectuant un temps de trajet supérieur au temps moyen de n secondes sur un parcours défini et surveillé par 4 antennes.
Alerter : ici nous ne ferons que logger le bagage manquant
Voyons comment ce service peut être implémenté dans GlassFish ESB.
La première étape correspond à la récupération des messages et leur sauvegarde.
Pour les récupérer, GlassFish ESB propose un composant permettant de recevoir une requête http (soap en l’occurrence). Ce sera l’entrée de notre système. Ce composant est du type Input Stream. Nommons-le AntennaMsg.
Maintenant pour identifier qu’un bagage a correctement été acheminé, il nous faut vérifier sur que toutes les antennes l’ont détecté durant les n dernières secondes. Donc sauvegarder les messages des n dernières secondes. Pour cela, nous insérons Save_All_Luggages, le composant Save Stream qui nous permet d’enregistrer les messages venant d’AntennaMs_g,_ dans une table de la base de données interne.
Pour détecter un bagage manquant, il faut attendre les n secondes nécessaire à son temps de transport après qu’il a été signalé par la première antenne, puis vérifier que les 4 messages envoyés par chacune des antennes tout au long du parcours ont bien été transmis. Pour cela nous allons utiliser le composant nommé Wait_n_Seconds sur le schéma, qui est du type Time Based Window.
Time Based Window est un composant qui garde en mémoire les événements détectés en entrée durant un temps défini. Sa spécificité repose sur le fait qu’il « pousse » en sortie tous les évènements qu’il a reçus durant les n dernières secondes et ce, à chaque fois qu’il en reçoit un nouvel en entrée.
Expliquons son fonctionnement à l’aide d’un schéma (en dehors de notre cas d'utilisation et avec une fenêtre de temps définie à 3 secondes) :
Dans ce schéma, A, B, C et D représentent des évènements (exemple, la détection d’un bagage par une antenne) et la notation A1 : l’évènement A détecté à t+1.
Détaillons :
Combinons le maintenant avec un autre composant : Delete Stream
Delete Stream, ici le composant Get_Msg_After_n_Seconds, va nous permettre de réaliser ce que l’on cherche à faire : attendre n secondes lorsqu’un évènement arrive pour le traiter et voir ainsi s’il est accompagné des autres évènements attendus.
Que fait le Delete Stream ? Il récupère les évènements supprimés par le composant précédent. Et cette fonctionnalité nous convient particulièrement bien car le composant précédent, Time Based Window, supprime les évènements après un laps de temps défini !
Ainsi, notre évènement est bien « poussé » au bout des n secondes souhaitées après le composant Get_Msg_After_n_Seconds.
Il faut alors vérifier que les 3 autres évènements attendus sont bien arrivés dans les n secondes suivant le premier.
Pour cela, nous utilisons la table enrichie par le Save_All_Luggages. Le composant de type Table Input nous permet de configurer les informations que nous voulons extraire d’une base de données.
Pour détecter si le bagage est correctement passé par les 4 antennes, il ne nous reste plus qu’à rechercher les informations dans la table. Pour cela, le composant Stream Projection And Filter permet de requêter les composants qu’il a en entrée.
Ci-dessous la requête écrite dans la configuration du composant.
La sortie de ce composant sera alors tous les évènements ayant le même luggageID mais ne comptabilisant pas 4 messages transmis.
Pour alerter, nous loggons l’évènement et nous filtrons pour que les évènements provenant du même bagage ne soit pas logger plusieurs fois. Nous enregistrons donc les bagages perdus dans une tables (grâce à SaveAllRecords_FromMissingLuggages), et ne loggons que les nouveaux dans un fichier en effectuant une requête pour vérifier que le bagage n’ait pas été déjà détecté.
Pour logger, le composant de type Stream Output nous offre la possibilité d’écrire facilement dans un fichier (ou dans une file JMS si on le souhaite).
Et voici le schéma de nos cas d'utilisation terminé :
Cet article nous a permis de remarquer que CEP est plus accessible avec un outil comme GlassFish ESB qu’avec un framework comme Esper car il est graphique et simple à prendre en main.
Cependant, l’utilisation d’IEP est limitée par une fonctionnalité de débogage inexistante sur la brique IEP et une documentation parfois légère. Elle parait bien adapté pour des besoins localisés, comme la supervision applicative et technique de modules logiciels par exemple, ce qui s’avère être très utile sur une plateforme inter-applications comme GlassFish ESB.
Enfin, GlassFish ESB offre à l’utilisateur la possibilité d’une implémentation rapide tout en se familiarisant avec un concept complexe à acquérir. Cette « boite à outil » CEP vous propose donc un ensemble de composants « prêts à l’emploi » qu’il ne vous reste alors plus qu’à assembler…