HDF 3.1 blog series part 2: Introducing the NiFi registry:
NiFi had support for using flow templates to facilitate SDLC [software development life cycle] use cases for a long time, but templates weren’t designed/optimized for that use case in the first place: no easy version control mechanism, not user friendly for sharing between multiple teams, no handling for sensitive properties, etc. But, now with the NiFi registry, you can get version control, collaboration, and easy deployment – significantly shortening the SDLC process, and accelerating flow deployment to achieve faster time to value.
During a project lifecycle and similarly to any other piece of code, NiFi flows go through a multiple environments chain (e.g. development, integration, preproduction, production). Each environment having specific settings, the properties of processors may vary based on the execution environment. Therefore, the ability to adapt NiFi flows properties on deployment is mandatory : this is the specialization process.
Specialization can be defined in NiFi context as the ability to apply a configuration set to a NiFi flow and its processors to make it specific to a given environment or usage without editing its logic.
While implementing data pipelines, similar patterns can be recurring. To optimize development and maintainability and to accelerate new use case implementation, generic NiFi flows can be created once and instantiated as many times as required : this is the instantiation process.
Instantiation that can be defined in NiFi context as the ability, from a generic NiFi flow, to generate new ones that may then be specialized to meet a specific use case.
Let’s take the example of a basic “Kafka to HDFS” pattern which consumes messages from a Kafka topic and writes into HDFS files:
Basic pattern “Kafka to HDFS”
Assuming that multiple data pipelines actually rely on this “Kafka to HDFS” pattern, at least two scenarios can be envisioned to handle such a case:
Overview of the two implementation scenarios
The first scenario consists in a decentralized model where the pattern is rebuilt each time from scratch or from copy-pasting an existing similar NiFi flow. The second one, much more interesting, is rather close to a centralized model with only one NiFi generic flow (master) built, maintained and synced with all the instances generated from the master.
At the end, keep in mind that the number of replicated items is the same in both scenarios, only the way to build them is different. In the second scenario, which is the one detailed in the following sections, the automation of the creation of any new NiFi flows based on an existing generic one (instantiation) and its specialization improve developers productivity, increase factorization and reduce delivery and maintainability costs.
Specializing a NiFi flow consists in setting values to variables of processors of a NiFi flow so that it can be contextualized and used for a specific purpose without altering its logic. The first step is to understand how to update properties value of NiFi objects.
NiFi comes with a UI for data engineers to design flows. It relies on calls to NiFi REST API, which is also directly available to any authorized users. This NiFi REST API is extremely rich and allows performing any actions usually done through the UI. Globally, the specialization process consists in orchestrating the right API calls to update a NiFi flow and make it operational.
Back to our Kafka to HDFS flow example: ConsumeKafka processor reads messages from a given topic, sends them to PutHDFS processor, which writes them down into a given HDFS folder. In NiFi UI, it could look like below:
A NiFi flow viewed from NiFi UI
Let’s specialize this NiFi flow by setting the name of the input Kafka topic (framed in red below) in the ConsumeKafka processor. Here are the properties for this processor before updating:
Processor properties before update
Now, running the REST API call:
curl -X PUT \ -H "Content-Type: application/json" \ -d '{"component": { "id":"2f9fb50e-0169-1000-8610-b6b24f84ffd5", "config": { "properties": { "topic":"/dev/mata" } } }, "revision": { "clientId": "2f30251f-0169-1000-443e-571d9105df39", "version":2 } }' http://localhost:8080/nifi-api/processors/2f9fb50e-0169-1000-8610-b6b24f84ffd5
And here is what it now looks like:
Processor properties after update
Thanks to NiFi REST API, we have been able to update the value of Kafka input topic name with a value for a pre-identified processor. This new value, along with the processor identifier, have been provided to this API call.
This very same procedure can be used to update any property fields as long as the processor identifier and the property name are known. Orchestrating all these updates for all the required variables of processors into a specialization application is a step further to specialization process automation and to making the NiFi flow ready for execution.
Note: be careful however, no consistency checks are run against the new value on NiFi end. Updating properties with inconsistent values can generate serious side-effects that may jeopardize NiFi service health.
Going even further, the next step is about centralizing required information for specialization into a configuration repository that will act as a reference for the specialization application.
Here is what the structure of such a repository could look like:
Information | Description | Value Example |
Flow name | Functional name used to identify a NiFi flow. | INGEST_KAFKA_TO_HDFS |
Flow identifier | Process group (flow) identifier given by NiFi.<br><br>A flow in NiFi can be represented as a “Process Group” object, itself composed by one or several processors or sub process groups.<br><br>This value is generally set by the instantiation stage that will be explained later. | 6be93ac7-1753-9755-a6ce-955a3ff30abe |
Processors configuration | Values to be applied to processors of NiFi flow. | <JSON object> |
The diagram below depicts interactions between this configuration repository and the specialization application:
Processors configuration information can be modelled as a JSON object such as:
[{"KAFKA_CONSUMER": { "properties": { "topic": "/dev/mata", "group ID": "DEV" } } }, {"PUT_HDFS": { "properties": { "Directory": "/dev/mata" } } }]
In the JSON object, “KAFKA_CONSUMER” and “PUT_HDFS” are both processor names used to identify the processors to be updated. Note that processor names are not unique across NiFi or even within the same process group. Whereas processor names are defined by data engineers and so consistent across all environments where the NiFi flow is deployed, processor identifiers are generated by NiFi and may be different while passing from one environment to another.
Since NiFi REST API uses identifiers to uniquely designate NiFi objects, it is recommended to use processor names in the configuration repository and implement a mapping process in the specialization application that will translate the processor name into its identifier in the execution environment.
Now we know how to update a processor property and that a configuration repository stores all processor properties to be specialized, we need to industrialize the property update process so that it can be applied to a whole set of identified processors within a process group. But before going further with this, it is important to understand how to handle processors identification since object identifiers in NiFi cannot be known in advance.
Object identifiers generation is actually rather simple: each object of any type in NiFi has got an identifier generated at creation. It is unique across the NiFi instance and this identifier allows to interact with NiFi objects through the REST API. Here is an example value of an identifier: 2f9fb50e-0169-1000-8610-b6b24f84ffd5**.**
Processor identifier viewed from NiFi UI
The process group (the NiFi flow) itself and each of its processors has got a unique identifier. So, how to retrieve easily the processor identifier we are about to update properties of?
A possible solution is about getting a processor identifier indirectly by using its name:
The diagram below includes these interactions:
As a quick summary, such an approach would imply that:
Reaching the end of this post, we have been able to implement the specialization process of NiFi flows by using NiFi REST API and automate / massify it through a configuration repository. With the ability to specialize NiFi flows for a given usage, the next step is now to investigate instantiation process aiming at automating NiFi flows creation from a master / generic one. It will be introduced along with NiFi Registry within the part 2.
Orchestrating both specialization and instantiation will result in a key applicative component to accelerate and optimize the delivery of NiFi flows.