Le problème des N+1 requêtes est un problème classique qui peut arriver lors du requêtage d’une base de données à travers un ORM. Lorsqu’une application fait une première requête à une base de données, puis au moins une requête pour chacun des enregistrements retournés par cette première requête, alors nous sommes dans une situation de N+1 requêtes.
Nous requêtons la base de données pour récupérer toutes les campagnes valides, puis pour chaque campagne valide, tous les messages associés.
SELECT *
FROM campagnes
WHERE c.valid_start <= ? AND c.valid_end >= ? AND ... ;
Et pour chaque campagne retournée :
SELECT *
FROM messages
WHERE messages.campagne_id_fk = ?;
Notre choix d’implémentation consiste à faire une requête pour récupérer les campagnes valides puis, pour chaque campagne valide, une requête pour récupérer ses messages associés.
Dans ce cas, nous nous attendions à réaliser :
(3)
Or, en faisant du profiling, nous nous sommes rendu compte que notre application réalisait beaucoup plus de requêtes qu’attendu (>P+1).
Voici le code Python pour récupérer les campagnes et les messages associés :
valid_campagne_entities = CampagneEntity.select().where(
(fn.date_trunc("day", CampagneEntity.valid_start) <= current_date)
& (current_date <= fn.date_trunc("day", CampagneEntity.valid_end))
& (CampagneEntity.zone_diffusion == zone_diffusion))
for campagne_entity in valid_campagne_entities:
campagne = campagne_entity.to_model()
for message_entity in MessageEntity.select().where(MessageEntity.campagne == campagne_id):
message = message_entity.to_model()
DO_SOME_STUFF(campagne, message)
Rien de surprenant dans ce code. Allons voir les objets MessageEntity et CampagneEntity.
Voici le code implémentant l’objet ORM MessageEntity faisant le pont entre la table messages de la base de données et l’objet Python Message :
class MessageEntity(BaseEntity):
class Meta:
table_name = "messages"
id = BigAutoField(...)
campagne = ForeignKeyField(...)
numero_abonne = BigIntegerField(...)
storables = JSONField(...)
def to_model(self) -> Message:
return Message(self.campagne.id, self.numero_abonne, self.storables)
La méthode to_model permet de construire un objet Message (notre objet Python modélisant un message au sens métier) à partir d’un objet MessageEntity (objet de l’ORM qui représente la modélisation en base de données).
L’investigation nous a montré qu’à chaque instanciation d’un objet Message l’usage de self.campagne.id impliquait une requête supplémentaire pour résoudre la clé primaire de la campagne associée. Ainsi, pour une campagne valide en BDD associée à 5 messages, nous avons :
Soit 7 requêtes au total au lieu des 2 initialement attendues.
Dans le schéma ci-dessous, nous nous focalisons sur la partie de la requête qui récupère des messages associés à chaque campagne valide et nous génère des requêtes supplémentaires non-souhaitées. On peut notamment voir qu’une requête pour récupérer les messages associés à une campagne valide génère Q requêtes supplémentaires :
Pour rappel, dans un scénario idéal, on aurait :
Soit 2 requêtes (cf ce schéma).
Pour résoudre ce problème, il nous a suffit d’utiliser directement l’attribut campagne_id_fk de l’objet MessageEntity qui est valorisé lorsqu’un message est récupéré de la BD.
class MessageEntity(...):
[...]
campagne = ForeignKeyField(column_name="campagne_id_fk", ...)
def to_model(self) -> Message:
return Message(self.campagne_id_fk, self.numero_abonne, self.storables)
Selon Wikipedia :
En informatique, dans les bases de données, un index est une structure de données utilisée et entretenue par le système de gestion de base de données (SGBD) pour lui permettre de retrouver rapidement les données. L'utilisation d'un index simplifie et accélère les opérations de recherche, de tri, de jointure ou d'agrégation effectuées par le SGBD.
L’index placé sur une table va permettre au SGBD d'accéder très rapidement aux enregistrements, selon la valeur d'un ou plusieurs champs.
Nous requêtons toujours la base de données de la manière suivante :
SELECT *
FROM messages m
JOIN campagnes c on c.id = m.campagne_id_fk
WHERE c.valid_start <= ? AND c.valid_end >= ? AND ... ;
Nous avons réalisé un EXPLAIN (4) sur notre requête SQL pour comprendre la stratégie du moteur de requête PostgreSQL pour exécuter cette requête. Voici le résultat :
(5)
Nous nous sommes aperçu qu’il faisait :
Nous pensions que le moteur de requête ferait une jointure d’index entre les clés primaires des campagnes et les clés étrangères des messages (campagne_id_fk). Pour information, les scans séquentiels sont les opérations les plus lentes à exécuter.
En creusant la documentation, nous avons découvert que par défaut Postgresql ne pose pas d’index sur les clés étrangères. En effet la documentation stipule la chose suivante :
A foreign key must reference columns that either are a primary key or form a unique constraint. This means that the referenced columns always have an index (the one underlying the primary key or unique constraint); so checks on whether a referencing row has a match will be efficient. Since a DELETE of a row from the referenced table or an UPDATE of a referenced column will require a scan of the referencing table for rows matching the old value, it is often a good idea to index the referencing columns too. Because this is not always needed, and there are many choices available on how to index, declaration of a foreign key constraint does not automatically create an index on the referencing columns.
Nous avons posé un index sur la clé étrangère campagne_id_fk de la table messages puisqu’elle devrait être constamment utilisée dans la jointure entre les tables :
CREATE INDEX campagne_id_fk_index ON MESSAGES (campagne_id_fk);
Pour le vérifier, nous avons créé cet index et exécuté à nouveau la requête avec EXPLAIN :
Nous voyions désormais que le moteur de requête réaliserait un scan d’index pour récupérer les messages valides. De plus, en comparant les temps passés sur chaque tables, nous constations que le temps a grandement diminué grâce à l’index :
Enfin, nous avons posé un index sur le champ zone_diffusion de la table campagnes, puisque ce champ est utilisé pour effectuer une sélection sur la zone de diffusion en question (6) :
CREATE INDEX zone_diffusion_index ON CAMPAGNES (zone_diffusion);
En revanche, le moteur PostgreSQL n’utilise pas cet index, car la sélection est faite sur trois champs : zone_diffusion, valid_start et valid_end. Or, pour être utilisés les index doivent être compatibles avec la clause WHERE. Ceux-ci peuvent être incompatibles pour diverses raisons :
Puisque trois champs interviennent dans notre requête, il faudrait construire un index multi-colonne. De plus, deux des champs de sélection sont des dates tronquées. Ainsi construire un tel index multi-colonne n’a rien de trivial. Sachant que le requêtage à la BDD n’était plus le goulot d'étranglement principale, nous avons abandonné la piste de ce dernier index multi-colonne.
Après avoir récupéré les messages valides pour une zone géographique, il faut les sérialiser dans une structure de données binaire permettant une manipulation bit à bit.
En effet, le fichier binaire résultant de la génération, appelé section MPEG, doit respecter la spécification DVB (Digital Video Broadcasting). Celle-ci spécifie notamment comment doivent être agencés les champs à encoder, elle s’appuie sur le schéma d’encodage TLV (Type-Length-Value), c’est-à-dire :
Pour pouvoir générer une section MPEG, il est nécessaire de pouvoir manipuler une structure de données à l’échelle du bit.
Pour nous faciliter la tâche, nous avons fait le choix de nous appuyer sur la librairie bitstream. Cette librairie est implémentée en Cython. Cython est à la fois un compilateur et un langage de programmation. Cython permet d’étendre le langage C en utilisant Python. Le langage est assez proche du Python et permet d’obtenir jusqu’à un facteur 100 sur les performances.
Pour sérialiser une section MPEG, par facilité d’implémentation, nous avons répété l’instanciation d’objets Bitstream volontairement plusieurs fois. En voici un aperçu au travers de la classe SectionHeader qui modélise l'en-tête d’une section MPEG :
class SectionHeader:
[...]
def __init__(self, campagne: Campagne, message: Message):
self.table_id = BitStream(0x92, np.uint8)
self.section_syntax_indicator = BitStream(0b0, bool)
self.reserved_future_use = BitStream(0b1, bool)
self.reserved = BitStream([0b1, 0b1], bool)
self.section_length = BitStream()
self.index = BitStream(0, np.uint8)
self.total_count = BitStream(1, np.uint8)
self.target_filter = BitStream(message.numero_abonne, np.uint32)
self.network_id = BitStream(campagne.zone_diffusion, np.uint8)
self.message_id = BitStream(campagne.id, np.uint32)
self.template_id = BitStream(message.template_id, np.uint32)
self.genre = BitStream(campagne.genre, bytes)
self.valid_start = BitStream()
self.valid_end = BitStream()
self.message_ack_count = BitStream(campagne.ack_count, np.uint8)
self.message_ack_delay = BitStream(campagne.ack_delay, np.uint16)
self.offset_appearance = BitStream(0, np.uint16)
self.offset_filter = BitStream(0, np.uint16)
self.offset_addressee = BitStream()
Le problème de cette implémentation est que chaque instanciation d’un objet BitStream crée une structure C sous-jacente avec une allocation mémoire. Or le changement de contexte (Python vers C), l’overhead, est extrêmement coûteux en temps d'exécution.
Pour diminuer l’overhead, nous avons limité l’instanciation d’objet BitStream au strict minimum. Nous nous sommes appuyé sur la méthode write de bitstream comme suit :
class SectionHeader:
[...]
def __init__(self, campagne: Campagne, message: Message):
self.debut = BitStream()
self.debut.write(0x92, np.uint8)
self.debut.write(0b0, bool)
self.debut.write(0b1, bool)
self.debut.write([0b1, 0b1], bool)
self.section_length = BitStream()
self.fin = BitStream()
self.fin.write(0, np.uint8)
self.fin.write(1, np.uint8)
Cependant, nous n’avons pas pu réduire le nombre d’instances de cet objet à un seul, car un certain nombre de champs dans la section MPEG sont à valoriser avant de pouvoir en valoriser d’autres positionnés en amont dans la section. Il faudrait disposer d’une méthode d’insertion sur un BitStream, qui n’est pas disponible. D’où les trois instanciations visibles dans le code ci-dessus.
Enfin, la réduction de ces instances a également permis de réduire l’empreinte mémoire.
“ctypes est une bibliothèque d'appel à des fonctions externes en python. Elle fournit des types de données compatibles avec le langage C et permet d'appeler des fonctions depuis des DLL ou des bibliothèques partagées, rendant ainsi possible l'interfaçage de ces bibliothèques avec du pur code Python.”
En d’autres termes, la bibliothèque ctypes permet d’exécuter du code C en écrivant du pur code Python. Rappelons, que le langage C est un langage de programmation bas niveau et compilé : deux caractéristiques qui le rendent extrêmement performant. En outre, il laisse à la charge du développeur la gestion de la mémoire. Ainsi, en C nous pouvons allouer des buffers de mémoire de taille souhaitées de manière très simple et directe.
Enfin, la bibliothèque ctypes définit plusieurs types de données de base compatibles avec le C, par exemple : le type c_uint en Python représente le type unsigned int en C.
Rappelons que pour sérialiser, nos sections MPEG nous faisons toujours face à de multiples instanciations d’objets interfaçant du C via la librairie bitstream. En effet, si on regarde dans les sources de bitstream, lorsqu’on souhaite écrire un bit (méthode write_bool ici), le buffer est augmenté à la volée, ligne 762 :
stream._extend(1)
Or la méthode _extend, ici, s’appuie sur la fonction realloc, ligne 220:
self._bytes = <unsigned char *>realloc(self._bytes, new_num_bytes)
Et réallouer un buffer, utiliser realloc(), pénalise les performances, car cette méthode crée un autre buffer, recopie dedans le contenu du premier buffer, et supprime ce premier buffer. Pour optimiser les performances, une solution est de pré-allouer un buffer suffisamment grand pour ne jamais avoir à invoquer cette fonction.
Enfin, bitstream fonctionne "bit à bit" dans le sens où les bits produits ne sont pas des bits "calés" sur des frontières de 8 bits. Ceci n'est pas efficace, car les processeurs travaillent avec des mots (64 bits maintenant) et il est préférable de travailler avec des structures de données au plus proche du mot du processeur.
Pour revenir à ces instanciations, certes elles sont en nombre très limités mais elles restent notre goulot d’étranglement principal en matière de temps d’exécution.
S’appuyer sur les ctypes pour allouer une structure de données sur mesure. En effet, ctypes permet de définir des structures comme dans le langage C. Ces structures définissent des champs avec une taille et un type. De plus, le choix du bit de poids fort est également paramétrable (BigEndianStructure / LittleEndianStructure) ce qui est important pour nous, la spécification nous imposant un encodage en BigEndian. Enfin, nous avons également la possibilité d’overrider la façon dont le compilateur C va aligner les champs pour qu'ils soient contigus.
Ainsi, nous avons défini une telle structure pour faciliter la sérialisation et réduire l’overhead à son minimum :
class HeaderStruct(BigEndianStructure):
_pack_ = 1
_fields_ = [("header_table_id", c_uint32, 8),
("header_section_syntax_indicator", c_uint32, 1),
("header_reserved_future_use", c_uint32, 1),
("header_reserved", c_uint32, 2),
("header_total_count", c_uint8),
...
]
def set_header_struct(self, campagne: Campagne, message: Message):
self.header_table_id = 0x92
self.header_section_syntax_indicator = 0b0
self.header_reserved_future_use = 0b1
self.header_reserved = 0b11
self.header_template_id = message.template_id
self.header_genre = campagne.genre.value.encode("utf-8")
...
Nous avons également envisagé d’autres pistes pour la sérialisation. Celles-ci n’ont pas été retenues car elles ne permettaient pas de répondre aux contraintes de performances. Voici ces pistes et la raison de leur abandon :
La dernière étape de la génération consiste à écrire la section MPEG sur un dépôt de fichiers (AWS S3).
Initialement, pour écrire sur s3, nous attendions d’avoir terminé la génération de la structure binaire encodant le carrousel d’une zone de diffusion. Or, attendre que tout le carrousel soit encodé pour l’écrire signifie garder en mémoire tous les messages encodés composant ce carrousel et il n’est pas inhabituel de compter des millions de messages par carrousel. Enfin, rappelons qu’un message a une taille maximale de 4096 bits soit 512 octets. Ainsi, un carrousel composé de 2 millions de messages aura une taille pouvant aller jusqu’à 1 Go.
carrousel_section_mpeg = create_carrousel_section_mpeg_for_zone_diffusion(zone_diffusion, current_date)
store(output_filename, carrousel_section_mpeg)
Pour améliorer l’usage de la mémoire, nous avons mis en place un mécanisme d’upload à la volée en s’appuyant sur les file-like object Python et la bibliothèque smart_open.
En Python, les différents types d’IO (entrée / sortie) sont gérés par le module io. Celui-ci permet de gérer l’écriture et la lecture de texte, de binaires et de raws. Le file-like object implémente une API orientée fichier avec les méthodes de lecture et d’écriture (read() et write()) s’interfaçant avec n’importe quel type d’IO (texte, binaire, raw).
La bibliothèque smart_open permet de lire / d’écrire en streaming de très gros fichiers de manière efficace depuis / vers des systèmes de stockage (S3, GCS, Azure Blob Storage) en implémentant l’API orientée fichier des file-like objects. Pour gérer l’écriture en streaming sur S3, smart_open s'appuie sur le multi-part upload de S3. Dans les coulisses, smart_open crée un buffer dans lequel on écrit réellement à chaque appel de la méthode write() et dès que la taille du buffer atteint une taille de 50 MB, celui-ci (correspond à une Part au sens du multi-part upload de S3) est uploadé sur S3 avec le mécanisme de multi-part upload.
Ainsi, nous avons refactoré notre implémentation de cette manière :
Voici notre classe permettant d’ouvrir un flux en écriture sur S3 avec smart_open :
from smart_open import open
class SectionMpegAwsS3StorageAdapter:
def upload(self, filename: str) -> IO:
url = "s3:///"
transport_params = {"session": boto3.Session(),
"multipart_upload": True,
"resource_kwargs": {"endpoint_url": ...}}
return open(url, "wb", transport_params=transport_params)
Voici le code orchestrant l’ouverture du flux, l’itération sur les messages et l’écriture dans le flux :
with section_mpeg_aws_s3_storage_adapter.upload(output_filename) as f:
for campagne, message in get_all_messages_for_valid_campagnes(...):
f.write(MessageSectionMpeg(campagne, message).to_bytearray())
Grâce à ce mécanisme, nous utilisons moins de 200 Mo de mémoire, et ce quelque soit le nombre de messages en entrée. Ainsi, aucun risque d’atteindre la limite mémoire de la machine.
La récupération des données peut impacter considérablement la mémoire. En effet, si notre requête retourne 1 million d’enregistrements, notre ORM va construire 1 million d’objets Python correspondant en mémoire. De plus, l’ORM a un mécanisme de cache qui permet d’éviter de requêter à nouveau la BDD. Ceci permet d’exécuter plusieurs fois la même requête en ne l’exécutant qu’une fois côté BDD. Cependant, ce mécanisme de cache a un coût en mémoire qui ne permet pas une exécution limitée à notre contrainte de départ de 200 MB de mémoire.
Pour pallier ce problème, nous avons utilisé les server-side cursors de psycopg2 (9)(10).
Par défaut, lorsque psycopg2 exécute une requête, tous les résultats sont récupérés et renvoyés au client par le backend. Cela peut amener notre application à utiliser beaucoup de mémoire lors de requêtes volumineuses.
En utilisant les "curseurs côté serveur”, les résultats sont renvoyés progressivement, par chunk (par défaut par 2000 enregistrements). Ceci permet de soulager grandement la mémoire du côté client (notre application) qui récupère ces enregistrements.
Nous avons dû refactorer notre code requêtant la base de données pour ne requêter la base qu’une seule fois. Pour rappel, cela se fait en deux étapes :
valid_campagne_entities = CampagneEntity.select().where(
(CampagneEntity.valid_start <= current_date) & ...)
for campagne_entity in valid_campagne_entities:
campagne = campagne_entity.to_model()
for message_entity in MessageEntity.select().where(MessageEntity.campagne == campagne_id):
message = message_entity.to_model()
DO_SOME_STUFF(campagne, message)
Or, pour simplifier l’utilisation du server-side cursor, nous avons fusionné les requêtes en une, en exécutant la jointure côtés BDD :
all_valid_messages = MessageEntity.select(...)
.join(CampagneEntity)
.where((CampagneEntity.valid_start <= current_date) & …)
for (...) in all_valid_messages:
campagne = Campagne(...)
message = Message(...)
DO_SOME_STUFF(campagne, message)
Ainsi, la base de données effectue la jointure plutôt que de la faire applicativement. Ceci est plus performant en termes de :
Enfin, il ne reste plus qu’à instancier l’objet approprié pour utiliser un server-side cursor :
query = MessageEntity.select(...)
return ServerSide(query, array_size=self.SERVER_SIDE_CURSOR_CHUNK_SIZE))
Lorsque l’on a des contraintes de performances, il est important de maîtriser les impacts de chacune de ses lignes de code. Pour comprendre ces impacts (instructions chronophages et / ou gourmandes en mémoire), les profilers de temps d’exécution et de mémoire sont très utiles.
Il est important d’adopter une démarche stratégique, telle que détaillée en début de cet article, pour s’attaquer aux plus gros problèmes de performance en premier et mesurer systématiquement l’évolution résultant de l’optimisation. On évitera ainsi de perdre de l’énergie sur une optimisation qui apparaîtra complètement dérisoire au début alors qu’elle sera potentiellement réellement bénéfique plus tard.
Dans cette quête d’optimisation, on retrouve des “suspects habituels” que sont le requêtage à la base de donnée (ORM & N+1 requests, index) pour le temps d’exécution et les multiples instanciations d’objets pour l'utilisation de la mémoire.
Par ailleurs, rendre son code plus performant implique souvent de faire des concessions sur sa simplicité voire sur l’architecture elle-même du code (nous avons opté pour une architecture hexagonale). Il y a toujours un compromis à faire entre performance et simplicité ; c’est ce qu’on évoquait dans les considérations générales vis-à-vis du multiprocessing ou multithreading.
Enfin, pour aller plus loin, nous aurions pu imaginer des implémentations plus complexes à base de multithreading pour paralléliser les IO (requêtage à la BD et écriture sur le S3). Cependant, les performances atteintes après la mise en place de ces optimisations étaient satisfaisantes pour nous et nous ne voulions pas complexifier plus notre base de code pour un gain minime.
(1) PyCharm n'est qu'une interface au profiler cProfile
(2) Attention, cette fonctionnalité n'est disponible que sous PyCharm dans sa version Professional
(3) Les couleurs sont réutilisées plus bas pour illustrer le propos.
(4)
EXPLAIN (ANALYZE on, TIMING on) SELECT *
FROM "campagnes" AS "t1", "messages" AS "t2"
WHERE ("t1"."id" = "t2"."campagne_id_fk") AND ((("t1"."zone_diffusion" = 'AFRIQUE') AND (date_trunc('day', "t1"."valid_start") <= '2021-02-04')) AND (date_trunc('day', "t1"."valid_end") >= '2021-02-04'));
(5) construit avec le PostgreSQL execution plan visualizer conçu par Dalibo
(6) Pour rappel, la génération d’un carrousel se fait par zone géographique.
(7) Le temps d’exécution ne baisse pas significativement car celui-ci est borné par le reste de la génération (sérialisation binaire) qui devient plus chronophage.
(9) psycopg2 est un adaptateur de base de données PostgreSQL très populaire en Python.
(10) peewee fait appel à psycopg2 lorsque la base de données utilisée est une base PostgreSQL.