La délégation de tâches en asynchrone est un moyen efficace d'alléger la charge que subissent nos systèmes. En effet, de nombreux cas d'utilisation ne nécessitent pas d'être exécutés de façon synchrone lorsqu'un utilisateur effectue une action ou qu'un événement extérieur intervient.
Par exemple, lorsqu'il n'est pas nécessaire de restituer la dernière version des données et que le traitement avant restitution est coûteux en ressources, il est possible de renvoyer des données préalablement mises en cache et de déporter en asynchrone une tâche de rafraîchissement de ce cache.
Un autre exemple concerne les systèmes qui mêlent des requêtes fortement consommatrices en ressources (CPU, mémoire, ...) et des requêtes peu consommatrices pour lesquelles on va vouloir garantir une latence faible même lorsque des requêtes consommatrices sont en cours de traitement. Pour cela, déléguer les traitements coûteux à des workers via une file de messages peut aider à garantir des temps de réponse faibles pour les autres requêtes. Cette séparation est d'autant plus importante sur des systèmes qui s'appuient sur des technologies monothreadées telles que Ruby ou NodeJS (ce dernier a néanmoins l'avantage de ne pas consommer de ressources/process lorsqu'il effectue des I/O).
Pour résoudre cette problématique de délégation de tâches, on utilise habituellement une solution comme ActiveMQ ou RabbitMQ. Seulement, lorsqu'il est nécessaire :
aucune de ces deux solutions ne permet de satisfaire l'ensemble de ces critères. Cet article présente une solution possible à base de ZeroMQ, écrite en NodeJS et utilisée pour traiter les deux exemples précédents.
ZeroMQ est une bibliothèque réseau écrite en C++ et qui offre des bindings dans de nombreux langages. Elle procure une couche d'abstraction au-dessus des sockets TCP classiques pour ajouter quelques fonctionnalités :
PUB-SUB
: l'association des sockets PUB
et SUB
permet de distribuer des messages d'un publisher (socket PUB
) vers N consommateurs (socket SUB
) qui reçoivent tous les mêmes messages,REQ-REP
: le pattern classique de requête/réponse (1 publisher, 1 consommateur),PUSH-PULL
: la socket de type PUSH
distribue les messages en round-robin dans la file de messages des sockets de type PULL
connectées : 1 publisher et N consommateurs qui se répartissent la consommation des messages.Pour utiliser ces patterns de communication, une socket ZeroMQ est capable de se connecter à plusieurs autres sockets ZeroMQ. Le comportement lors de l'envoi d'un message sur une socket ZeroMQ connectée à plusieurs sockets dépendra du type de cette socket.
Bien que ZeroMQ fonctionne sans broker, il peut être nécessaire de réintroduire dans l'architecture un élément de ce type afin de limiter le nombre de connexions réseau. Par exemple, dans une architecture de délégation de tâches, avec 100 publishers et 100 workers, sans broker, il faudra établir 10 000 connexions (100 publishers * 100 workers) qui risquent d'engorger le réseau. Avec 2 brokers (pour supporter la panne de l'un d'entre eux), il faudra 400 connexions (100 publishers * 2 brokers + 100 workers * 2 brokers).
ZeroMQ propose 2 autres types de sockets qui permettent notamment le routage de messages :
ROUTER
: cette socket ajoute aux messages entrant l'identifiant de la socket ZeroMQ source dans une nouvelle frame qui devient la première frame du message. Ensuite, elle utilise cet identifiant des messages sortants (en supprimant la première frame du message) pour sélectionner la socket destination du message (si elle est connectée bien sûr). En modifiant cette frame, nous sommes capables de personnaliser le routage des messages sortants via cette socket. L'identifiant d'une socket peut être spécifié à la création sinon il sera généré aléatoirement par ZeroMQ.DEALER
: cette socket permet de recevoir et envoyer des messages (contrairement à la socket PUSH
qui peut seulement en envoyer). L'envoi de messages se faisant en round-robin aux sockets connectées.Avec ces différents types de sockets nous pouvons les assembler de la façon suivante pour obtenir notre système de délégation de tâches asynchrones avec les caractéristiques suivantes :
NB : Nous utilisons ici 3 brokers, pour éviter que les workers envoient systématiquement le même type de message au même broker (ce qui aurait été le cas avec 2 brokers car le worker envoie alternativement un message READY
et un message RESULT
).
Cette architecture permet de supporter la perte de n'importe quel composant mais aussi d'ajouter des publishers, des brokers ou des workers suivant la charge de tâches à traiter.
En utilisant une socket de type DEALER
du côté des publishers, celle-ci étant connectée à tous les brokers, les messages sont envoyés en round-robin (par ZeroMQ) vers les brokers (étapes 1, 2 et 3 à partir des publishers). Si un broker venait à disparaitre, la socket DEALER
continuerait à dialoguer avec les brokers restants et lorsque le broker perdu serait de nouveau accessible, ZeroMQ se chargera de se reconnecter sur ce broker qui réintègrera alors le cycle de round-robin d'envoi de messages. Les messages envoyés ne contiennent qu'une seule frame contenant les données de la tâche à exécuter :
Code du Publisher :
var broker = zeromq.createSocket('dealer');
["tcp://broker1:5555",
"tcp://broker2:5555",
"tcp://broker3:5555"].forEach(function(address) {
broker.connect(address);
});
...
broker.send("task data");
Les messages provenant des publishers sont reçus sur une socket ROUTER
sur un des brokers. Celle-ci ajoute une frame contenant l'identifiant de la socket DEALER
du publisher qui a envoyé le message. Ceci permettra, si le message est envoyé via une socket ROUTER
connectée à ce publisher, de router le message vers le bon publisher. Le broker a la charge de gérer une file de messages en mémoire, dans un fichier ou dans une base de données selon les besoins (garantie de ne pas perdre de message, performances, ...). Les messages traités par le broker contiennent donc 2 frames :
Code du broker :
var publishers = zeromq.createSocket('router');
publishers.on('message', function() {
// Process messages from publishers
var publisher_id = arguments[0],
task_data = arguments[1];
// Store in a queue
...
});
publishers.bindSync("tcp://*:5555");
Du côté workers, ces derniers se connectent à tous les brokers via une socket DEALER
. Ils envoient un message de type READY
(en utilisant une frame) pour récupérer une tâche disponible sur les brokers (étapes 1 et 4 à partir des workers). Le type du message est géré dans le code applicatif et non pas par ZeroMQ. Il permet au code applicatif du broker de différencier les messages en provenance des workers :
READY
pour préciser que le worker est prêt à traiter une tâche,RESULT
pour préciser que le message contient le résultat d'une tâche qui devra être transmis au publisher ayant publié la tâche.En utilisant une socket DEALER
, les messages seront donc alternativement envoyés sur l'un ou l'autre des brokers et la socket gérera la perte d'un broker. Le message envoyé par un worker pour récupérer une tâche ressemble donc à :
Le code du worker :
var broker = zeromq.createSocket('dealer');
broker.on('message', function() {...});
["tcp://broker1:5555",
"tcp://broker2:5555",
"tcp://broker3:5555"].forEach(function(address) {
broker.connect(address);
});
broker.send(READY);
De la même manière que du côté publisher, un message READY
se verra ajouter une frame contenant l'identifiant du worker qui l'a envoyé par la socket ROUTER
du broker :
Avec un message contenant une tâche à exécuter (provenant d'un publisher) et d'un message READY
(provenant d'un worker disponible), le broker peut constituer un message contenant la tâche en plaçant l'identifiant d'un worker disponible dans la première frame afin que le message soit routé vers ce worker par la socket ROUTER
(qui supprimera au passage cette première frame):
Le code du broker devient alors :
var workers = zeromq.createSocket('router');
workers.on('message', function() {
// Process messages from workers
var worker_id = arguments[0],
message_type = arguments[1]; // READY
if (message_type == READY) {
// dequeue tasks
var task, publisher_id = ...;
workers.send(worker_id, publisher_id, task);
}
});
workers.bindSync("tcp://*:5555");
Le worker (celui qui a envoyé un message READY
) reçoit alors le message (étape 2 à partir du broker1) suivant contenant la tâche à exécuter :
Le handler de réception d'un message sur le worker :
broker.on('message', function() {
// Process task messages from broker
var publisher_id = arguments[0],
task_data = arguments[1];
// process task
...
});
Cette architecture permet aux workers de renvoyer une réponse au publisher si l'on veut par exemple que le publisher réponde de façon synchrone sans pour autant accaparer le CPU et bloquer les requêtes suivantes (cas de NodeJS ici). Le cheminement inverse s'effectue alors de la façon suivante :
Le worker envoie un message contenant le résultat via sa socket DEALER
vers les brokers (toujours en round-robin et suivant la disponibilité de ces derniers) en utilisant une frame pour spécifier un type de message RESULT
(étape 3 à partir des workers) :
Code du handler de traitement des messages par un worker :
broker.on('message', function() {
// Process task messages from broker
var publisher_id = arguments[0],
task_data = arguments[1];
// process task
...
// Send a result to the publisher
broker.send(RESULT, publisher_id, result);
// Send READY message to get a new task
broker.send(READY);
});
Un des brokers reçoit le message via sa socket ROUTER
qui ajoute une frame avec l'identifiant du worker. Dans ce cas-ci, l'identifiant nous sera inutile.
L'identifiant du publisher à l'origine de la tâche étant toujours présent dans le message, il est possible de lui envoyer le message suivant sur la socket ROUTER
sur laquelle sont connectés les publishers. La socket routera alors le message vers le bon publisher :
Le handler des messages en provenance des workers devient alors :
workers.on('message', function() {
// Process messages from workers
var worker_id = arguments[0],
message_type = arguments[1], // READY or RESULT
publisher_id = arguments[2],
result = arguments[3];
if (message_type == READY) {
// dequeue tasks
var task, publisher_id = ...;
workers.send(worker_id, publisher_id, task);
} else if (message_type == RESULT) {
publishers.send(publisher_id, result);
}
});
le publisher reçoit alors le message contenant le résultat sur sa socket DEALER
(étape 4 à partir du broker2) :
Le handler de messages d'un publisher :
broker.on('message', function() {
// Process messages from broker
var result = arguments[0];
...
});
Comme nous avons pu le constater, ZeroMQ permet d'élaborer une solution adaptée à ses besoins en combinant les différents types de sockets disponibles (d'autres exemples sont accessibles dans le guide ZeroMQ). Cette souplesse vient tout de même au prix d'une certaine complexité puisqu'il faut élaborer son propre protocole (en utilisant les frames des messages ZeroMQ) et traiter quelques sujets qui n'ont pas été abordés dans cet article comme la gestion des timeouts. Par exemple, lorsqu'un broker reçoit avec succès un message READY
mais ne peut répondre au worker en raison d'une coupure réseau ou d'un crash du broker, dans ce cas là, sans timeout, le worker pourrait attendre indéfiniment que le broker lui renvoie une tâche à exécuter. ZeroMQ offre aussi de très bons résultats concernant les performances, que ce soit la latence ou le débit de transmission des messages. Ceci en fait une bonne solution pour les systèmes devant traiter de gros volumes de messages (en nombre ou en taille). Il est cependant moins adapté lorsque l'on ne peut pas se permettre de perdre des messages : dans l'exemple de cet article, la file de messages est n'est pas persisté sur le broker et ZeroMQ ne gère pas de transaction; un message envoyé aux brokers n'est donc pas garantir d'atteindre un worker ou un publisher si un broker crash.