Apache Falcon and Apache Atlas, we realise the limits when it comes to being flexible in front of the diversity of functional needs and efficiency on perimeters with drastically different sizes.
Thus, Atlas, which is maturing (v0.8.1), performs global data lineage by scanning internal metadata of the stacks (HDFS, [Hive](https://cwiki.apache. org/confluence/display/Hive/Home) Metastore, HBase, Kafka,...). The granularity of the lineage is high, at the level of the data path, but without integrating all the information of the data lifecycle, such as the version. A dataset can keep indefinitely the same URI but its version can evolve over time, which is not managed by Atlas. It is mainly used to perform static lineage, which answers questions such as :"From which table is such other table an output?".
Falcon, also maturing (v0.10), serves mainly as a meta-orchestrator, which can generate data pipeline lineage information. The grain is natively limited to the data pipeline.
If the need is to precisely trace the life cycle of the data, e.g to the dataset's version, then answering the question: "Which data set and version another dataset is issued from?" becomes a very difficult architectural use case. This is referred to as dynamic data lineage, where tracking follows the lifecycle of the data as processing pipelines evolve in terms of functions and perimeter.
This article aims to present an effective way of answering this last question. This solution is based on the use of Apache Spark as a processing engine, in particular by exploiting the Logical Execution Plan tree. However, it remains open by design to be used by other treatment processes.
The main theoretical challenge of data lineage is to answer the question "which data has been used by which process to produce other data". Although the tracked object is the data itself, it is its metadata that will be used to establish this lineage. The concept usually adopted to solve this type of problem is based on the formulation of relationships between entities in the form of triples semantics. This concept is one of the pillars of the RDF standard (Resource Description Framework), a family of metadata model specifications used in particular to describe the links between resources of Semantic Web. It can also be found in many models such as the models which describe graphs.
A triple semantics is a set of three entities that transcribe the relationship between them. It takes the form: subject --> predicate --> object. Applied to data lineage, it can be presented in two modes:
If we transpose these principles to the data processing in order to carry out a lineage with the granularity of the data set and its version, the triple semantics can be formulated as follows:"the data set B version vB was produced by the process X version vX using the data set A version vA". This is a backward lineage, the subject being the data set produced (B in vB version), the predicate (or the verb) being "was produced by the X process version vX using" and the object
Each product data set is connected to the source data sets via a process (and vice versa). This approach results in the construction of a cyclical graph oriented or DAG (that is, not containing a closed circuit), allowing to trace from each data set / version to all the data sets used for its generation.
It is interesting to note that in this case (especially at this granularity level), only the I/O of the processes are taken into account, it is not necessary to exploit the data manipulated internally. As seen above, this level of granularity (data set / version) also requires data dynamic lineage so that the implemented mechanism can follow the evolution of the versions without changing model or code.
One of the major difficulties that can be encountered is the management of the version of data set. A multitude of cases can exist and lead to a different strategy, depending on the use case and the architecture adopted. As an illustration of possible strategies, let's take an example where the version of each data set corresponds functionally to the data collected/processed during a given day and is materialized by a simple increment.
It is then possible to adopt two different data addressing strategies, corresponding to two architectural choices:
In Case I, the source data sets A and B are addressed without reference to the version. The generated data set C is then dependent on all versions of the data sets A and B. We return to a static lineage situation. In Case II, the source data sets A and B are addressed by version, the vC version of the generated data set C is then calculated according to vA and vB. In our case, we designate a reference data set, e. g. A, and a simple calculation function: vC = vA.
The following proposal is the result of work carried out in a client's IS, within the framework of a PoC, to provide a solution presenting:
Functionally, the customer's main motivation was to respond to a legal constraint to allow a full audit of the data origin at the data set level. Operationally, it is a question of obtaining perfect control of the impact perimeter of an action on a data set stamped as critical to ensure a re-generation in all circumstances.
Not all of these topics can be addressed in this article. I propose a simplified vision of the problem, focused on the problem of lineage while keeping in mind the constraints mentioned above. The entire demo source code used is available here.
Thus, in this very simplified use case:
Let's choose the second release propagation strategy mentioned above for our example. The content of the data sets can also change over time, as new versions of the recordings can be written. In this case, it makes sense to include the version of the data set in the record data model, as follows in the case of the data set clients-v15:
{"name":"Bike":"surname":"Mike","product-id":"B12","quantity":"1","version":"15"}
{ "name":"Poll":"Poll","surname":"Kevin":"product-id":"R23","quantity":"2","version":"15" }
It will therefore be necessary to adapt the lineage and persistence mechanism to take into account the possible values of this version. For example, in our case, we chose to use the version as one of the downstream partitioning keys for input files (i. e., namesAndProducts and productSummary) to allow data purge operations based on the data set version.
Datasets lifecycle also means unambiguous identification in the IS of any process instance. This implies that static identification of a process (by the driver class name for a very simplistic example) is not enough. Each instance, i. e. Each effective launch of each driver will have to be uniquely identified. In this example, this instance will be identified by an Run Id compounded with the driver identifier.
Our example is then described in the following diagram:
In Run Id 12, the files clients-v15. json and products-v3. json are processed by BusinessDriver_One and then persist in namesAndProducts. json. In the Run Id 13 a new client version is integrated. The Run Id 47 of BusinessDriver_Two intervenes after these two runs to produce an aggregated view of namesAndProducts in productSummary. json.
In order to generate lineage information dynamically, the idea is to exploit certain features of the Spark processing engine. Indeed, in one of the phases of preparation of the execution of the treatments in a distributed way, this engine builds in particular a DAG of the transformations carried out which it formalizes in a logical execution plan, which makes it a lineage of the transformations. Since these transformations use input data sets and generate output data sets, we just have to find a way to use this DAG to meet our need to trace the data sets.
In Spark-based industrial processing applications, such as those developed for the Data Hub, the most widely used data containers are the RDD. Since Spark version 2.0, the use of DataFrame and DataSet is growing with the maturity of Spark SQL. In either case, operating the DAG will be done differently.
On an RDD instance, there is a function toDebugString ()
that provides a String describing the transformation DAG to obtain this RDD:
(2) ShuffledRDD[29] at reduceByKey at BusinessDriver_One. scala: 46[]
+- (2) UnionRDD[28] at union at BusinessDriver_One. scala: 46[)
| MapPartitionsRDD[13] at map at BusinessDriver_One. scala: 20[)
| MapPartitionsRDD[7] at rdd at AppCore. scala: 52[)
| MapPartitionsRDD[6] at rdd at AppCore. scala: 52[)
| MapPartitionsRDD[5] at rdd at AppCore. scala: 52[)
FileScanRDD[4] at rdd at AppCore. scala: 52[)
| MapPartitionsRDD[27] at map at BusinessDriver_One. scala: 22[)
| MapPartitionsRDD[21] at rdd at AppCore. scala: 52[)
| MapPartitionsRDD[20] at rdd at AppCore. scala: 52[)
MapPartitionsRDD[19] at rdd at AppCore. scala: 52[)
FileScanRDD[18] at rdd at AppCore. scala: 52[)
This function is intended for debugging and does not provide structured information. I have reproduced the exact logic of Spark 2 to provide, through a RDD parameter, the lineage tree of these treatments. This gives for the same RDD (see function >RDDLineageExtractor. lineageTree ()
):
(2) ShuffledRDD[29] at reduceByKey at BusinessDriver_One. scala: 46[]
UnionRDD[28] at union at BusinessDriver_One. scala: 46[)
MapPartitionsRDD[13] at map at BusinessDriver_One. scala: 20[)
MapPartitionsRDD[7] at rdd at AppCore. scala: 52[])
MapPartitionsRDD[6] at rdd at AppCore. scala: 52[])
MapPartitionsRDD[5] at rdd at AppCore. scala: 52[])
FileScanRDD[4] at rdd at AppCore. scala: 52[)
MapPartitionsRDD[27] at map at BusinessDriver_One. scala: 22[)
MapPartitionsRDD[21] at rdd at AppCore. scala: 52[])
MapPartitionsRDD[20] at rdd at AppCore. scala: 52[])
MapPartitionsRDD[19] at rdd at AppCore. scala: 52[])
FileScanRDD[18] at rdd at AppCore. scala: 52[)
This function can then be used in a centralized code, such as the generic kernel of applications (here symbolized by the class AppCore
), to be invoked for each action (typically an I/O) of any executed driver. The lineage information, including the processing tree, is then generated by invoking the same function (here lineSparkAction()
) when reading a JSON, just after building the corresponding RDD and writing an RDD, just before its persistence:
def writeJson (rdd: RDD[_ < Row], outputFilePath: String): Unit = {
lineSparkAction (rdd, outputFilePath, IoType. WRITE)
val schema = rdd. take (1)(0). schema.
spark. createDataFrame (rdd. asInstanceOf[RDD[Row]], schema). write. json (outputFilePath)
}
def readJson (filePath: String): RDD[Row] = {
val readRdd = spark. read. read. json (filePath). rdd
lineSparkAction (readRdd, filePath, IoType. READ)
readRdd
}
The lineage function has three responsibilities: to generate the processing tree by the function lineageTree()
, to generate the meta data of the data set (whether it is input or output, everything depends on the direction of the I/O) by analyzing the RDD and the target container and, finally, to produce the lineage messages by exploiting all these data. In our case, and for the sake of simplicity, these messages are also persisted on HDFS. However, one can imagine many other patterns beyond the scope of this article.
private def lineSparkAction (rdd: RDD[_ < Row], outputFilePath: String, ioType: IoType. Value) = {
val rddLineageTree: TreeNode[String] = RDDLineageExtractor. lineageTree (rdd)
val datasetMetadata: (String, String) = getDatasetMetadata (rdd, outputFilePath)
val processInformation: String = processId
produceLineageMessage (rddLineageTree, datasetMetadata, processInformation, ioType)
}
When using Spark SQL DataFrame (or DataSet), the task is greatly simplified. Since Spark version 2, processing lineage information is provided directly from the DataFrame API. For a given DataFrame, the inputFiles ()
method returns a table of the data sources used to generate this DataFrame. Therefore, it is sufficient to intercept only the outputs to generate the lineage information. The immediate impact is that fewer lineage messages are generated.
def readJsonDF (filePath): DataFrame = {
spark. read. json (filePath)
}
def writeJson (df: DataFrame, path: String, lineData: Boolean = true, saveMode: SaveMode = SaveMode = SaveMode Overwrite): Unit = {
if (lineData) lineSparkAction (df, path, IoType. WRITE)
df. write. mode (saveMode = saveMode). json (path)
}
Keep in mind: "Depending on the source relations, this may not find all input files". The designer/developer must verify that the various data sources used are well traced.
The generated lineage messages must contain the information needed to build in detail the semantic triples needed to build the data set lineage graph. A sufficient but not exhaustive list of information to be included is:
The lifecycle status being precisely a meta-data that will evolve with the functional lifecycle of the data set (e. g. go from a "given" state to "critical data").
In our case, the identification of the process in the lineage messages has been simplified to its strict minimum. Thus, only the class name and the Run Id run are used. The rest of the information to be identified in the method lineSparkAction ()
is not a technical challenge.
+----------------+------+--------------------------------------------+---------+-----------------------------------------------------------------+------------------------------------------------------------------------------------------+-------+------------------------------------------------------------------------------------+-------+
|datasetName |ioType|producer |reference|sinks |sources |status |uri |version|
+----------------+------+--------------------------------------------+---------+-----------------------------------------------------------------+------------------------------------------------------------------------------------------+-------+------------------------------------------------------------------------------------+-------+
|namesAndProducts|WRITE |com.octo.spark.lineage.BusinessDriver_One$13|false |(1) MapPartitionsRDD[29] at map at BusinessDriver_One.scala:52 []|FileScanRDD[4] at rdd at AppCore.scala:45 [],FileScanRDD[16] at rdd at AppCore.scala:45 []|Initial|data-lineage-spark-demo/src/main/resources/examples/generated2/namesAndProducts.json|16 |
|namesAndProducts|WRITE |com.octo.spark.lineage.BusinessDriver_One$12|false |(1) MapPartitionsRDD[29] at map at BusinessDriver_One.scala:52 []|FileScanRDD[4] at rdd at AppCore.scala:45 [],FileScanRDD[16] at rdd at AppCore.scala:45 []|Initial|data-lineage-spark-demo/src/main/resources/examples/generated2/namesAndProducts.json|15 |
|productSummary |WRITE |com.octo.spark.lineage.BusinessDriver_Two$47|false |(1) MapPartitionsRDD[13] at map at BusinessDriver_Two.scala:28 []|FileScanRDD[4] at rdd at AppCore.scala:50 [] |Initial|data-lineage-spark-demo/src/main/resources/examples/generated2/productSummary.json |16 |
|namesAndProducts|READ |com.octo.spark.lineage.BusinessDriver_Two$47|true | |FileScanRDD[4] at rdd at AppCore.scala:50 [] |Initial|data-lineage-spark-demo/src/main/resources/examples/generated2/namesAndProducts.json|16 |
|products-v3 |READ |com.octo.spark.lineage.BusinessDriver_One$12|false | |FileScanRDD[16] at rdd at AppCore.scala:45 [] |Initial|data-lineage-spark-demo/src/main/resources/examples/raw/products-v3.json |3 |
|products-v3 |READ |com.octo.spark.lineage.BusinessDriver_One$13|false | |FileScanRDD[16] at rdd at AppCore.scala:45 [] |Initial|data-lineage-spark-demo/src/main/resources/examples/raw/products-v3.json |3 |
|clients-v15 |READ |com.octo.spark.lineage.BusinessDriver_One$12|true | |FileScanRDD[4] at rdd at AppCore.scala:45 [] |Initial|data-lineage-spark-demo/src/main/resources/examples/raw/clients-v15.json |15 |
|clients-v16 |READ |com.octo.spark.lineage.BusinessDriver_One$13|true | |FileScanRDD[4] at rdd at AppCore.scala:45 [] |Initial|data-lineage-spark-demo/src/main/resources/examples/raw/clients-v16.json |16 |
+----------------+------+--------------------------------------------+---------+-----------------------------------------------------------------+------------------------------------------------------------------------------------------+-------+------------------------------------------------------------------------------------+-------+
One quickly realizes the incomplete side of messages: many messages (those in reading) do not include "sinks". In this case, in order to build the lineage graph, it is necessary to carry out a consolidation step. It consists of linking source nodes to well nodes by analyzing the graphs built from messages.
Once this consolidation step is completed, and once the graph has been constructed (class LineageGraphGenerator
), we obtain the following lineage for our example:
Node[0]--->[Dataset: namesAndProducts, Version: 16] has been produced by:
[com. octo. spark. lineage. BusinessDriver_One$13] using:[Dataset: products-v3, Version: 3]
[com. octo. spark. lineage. BusinessDriver_One$13] using:[Dataset: clients-v16, Version: 16]
Node[1]--->[Dataset: namesAndProducts, Version: 15] has been produced by:
[com. octo. spark. lineage. BusinessDriver_One$12] using:[Dataset: products-v3, Version: 3]
[com. octo. spark. lineage. BusinessDriver_One$12] using:[Dataset: clients-v15, Version: 15]
Node[2]---->[Dataset: productSummary, Version: 16] has been produced by:
[com. octo. spark. lineage. BusinessDriver_Two$47] using:[Dataset: namesAndProducts, Version: 16]
Presented graphically (by truncating class names for readability), this corresponds to the following:
This lineage graph can then be used to manage the data lifecycle at the data set level. For example, suppose that only the data set productSummary in version 16 should be retained in the long term because it is considered as a critical data by the business. In parallel, we want to delete in a purge process the older versions of the data sets clients and products. To what extent can this deletion be achieved without jeopardizing the possibility of recovery (as in the case of a PRA) of the data productSummary?
The answer to this question is to carry out an impact analysis on the lineage graph. This analysis is based on the identification of connected components (connected components) by implementing the algorithm of the same name using a graphically oriented database engine. The engine used in our case (always in the class LineageGraphGenerator
) being Neo4j, via a very simplistic integration for the needs of the article. This algorithm then provides a subgraph that allows you to know all the connected data sets / source versions with productSummary in version 16, i. e. clients v16 and products v3.
This is where the attribute "status" added to the data model of the lineage messages comes in. By default, all nodes are assigned the "Test"value. If a data set is declared "critical" by the business line, a specific department will have to update the corresponding node in the graphical database accordingly. The same service will request this database to get the connected data sets and tag them as "critical". The purge service will have to systematically check the status of a data set / version and not delete it if its status is "critical".
The subject is vast and involves profound impacts, both on the architectural level, development management and data governance. In this article, however, we have cleared some of the key aspects of data lineage at the data set level, namely the following: