MPI (Message Passing Interface) was the library used for intensive computing, it was not designed for large data processing. But according to Google, it is a new use case that is not treated by any existing solution.
Google’s treatments (compute the PageRank, building a reverse index) are long-term processing. Within a distributed environment, the probability of a failed treatment increases with the number of machines running parallel processing. This means that Google should think about a way to tolerate the error (known as fault tolerance).
From these successive observations, Google has been inspired by functional programming to create a new approach within a distributed environment: MapReduce
The basic principle is to remove this bottleneck known as the network: Moving binaries, rather than the data.
Instead of distributing the data at the reading stage, we will distribute the data at the writing stage over multiple machines. At the reading stage, there will be only the compiled code that will transit over the network instead of 10TB of HTML pages.
Moreover, the MapReduce paradigm aims greatly to simplify the writing of parallel programs. Progress existed so MPI could co-locate data but it never really materialized because it was not very intuitive and added more complexity.
The fault tolerance is easier to implement: when the data is written, the data is then replicated to different locations. In case of a machine failure containing the information, we will be able to perform our treatment on the replicated data.
Hadoop came in 2006. To build itself, Hadoop requires a distributed file system Nutch File System (currently HDFS), and an implementation of Google MapReduce’s paper. The framework is written in Java, the simple reason is: “Java is portable, easy to debug and widely acclaimed by the community”, said Doug Cutting, its main creator.
With MapReduce, we are using another data-model to represent our recordings: key/value. The perks of this model is its simplicity to be distributed. Each machine gets a key sequence to manage. It is easy to iterate in parallel over each key and perform a treatment on the value. The number of sequences defines the degree of parallelism (number of machines which process a parallel sequence).
// map phase. Ce code est exécuté sur chaque worker
while(recordIterator.hasNext) {
record = recordIterator.next;
map(record.key, record.value);
}
The user (developer MapReduce) only needs to specify the purpose of the map() méthod.
The new pairs of key/value generated by the mappers are then distributed to the reducers through the network. This sending stage is called the “Shuffle & Sort”.
We can notice that MapReduce is not really a complete solution regarding network issues. Indeed, the shuffle & sort stage mainly uses the network as an exchange medium output data of the map stage to the reducers. The good practice to adopt in the mappers is to filter records, plan (reduce the number of columns), pre-aggregate and compress to significantly reduce the data travelling over the network in the next phase. Unfortunately, these practices are applicable only in individual cases, it is difficult to find a comprehensive solution to this problem.
Around 2008, we begin to see a new use case: the need for a higher level of language to express a distributed processing. A language designed for the data, the SQL.
Facebook develops Hive, a SQL engine generating MapReduce in 2009. Google introduced Dremel, a distributed datastore interpreting SQL and "optimizing" the full scan (fun), which will give birth to the project Apache Drill (worn by MapR). Impala, Presto, Apache Phoenix, Druid, Pinot, are all so many technologies that will address this need of large volume analysis.
In short, the Analytics is the new use case.
Meanwhile, we begin to see tests in the field of artificial intelligence. The Machine Learning is entering in the Hadoop ecosystem with Apache Mahout and opens new perspectives that stimulate the specialized press.
These two new possibilities on Hadoop (analytics and machine learning) have a common characteristic: they consist of iterations.
Thus, an SQL query is made of several joints that will be split into multiple MapReduce jobs, storing the intermediate result on HDFS. A clustering algorithm such as K-means proceeds by successive iterations until the centroids reach a steady state. They are given the name of iterative algorithms.
At the end of each MapReduce iteration, we store the intermediate result on HDFS for the next iteration. Not only we stored on any hard drive, but the data will be replicated (3 times by default). Then, in the next iteration the intermediate data need to be reload in memory (during the map) to send them directly to the reducers (with shuffle!). It turns out that HDFS is a rather slow abstraction (30MB / s write, 60MB / s average read SATA) that just increase the overall processing time.
As you would have guessed, I/O disks are the new bottleneck.
This bottleneck already existed in reality, but it was masked by the I/O network, and it was not too painful for batch processing.
To minimize the impact of an iteration, we will do what we usually do when we have a need to read / write the data quickly: cache in RAM.
Fortunately, the price of memory has dropped a lot and it is common for some time to find servers with 128 or 256 Go of RAM.
So we can imagine that a dataset of 1TB (replicated) could be hold in memory on a small cluster of about 12 servers…
That is how, the research paper). The Framework is introduced by UC Berkley to solve precisely this type of iterative workload. They think about an abstract data model going through the iterations: the RDD (Resilient Distributed Dataset paper)
An RDD is a collection of records, distributed across a cluster, recovered from a context initialized at the beginning of the job.
//records représente un RDD JavaRDD<String> records = spark.textFile("hdfs://...");
Transformations or actions are successively applied. These two concepts, specific to the Spark’s vocabulary can be generalized by what is called a transaction. It exists map side operators and reduce side operators (shuffle operator). The difference between one and the other is that the reduce side is an operator type that triggers the shuffle (redistribution phase by the network of key / value pairs) before the execution. Spark is not the only abstraction based on MapReduce, most of the frameworks, dataflow type (to express a sequence of transformations) use these same concepts: Apache Flink, Apache Crunch, Cascading, Apache Pig, etc …
records.filter(...) // est une opération map-side
records.groupByKey() // est une opération reduce-side
With Spark, one action generates a run with a shuffle phase.
There are different types of RDD, some offering caching functions particularly to clarify that a dataset will be used on several iterations (thus hiding the intermediate results). The dataset will be stored either in a local cache in RAM or on disk or shared between RAM and disk.
records.cache();
But this is not the only factor of intermediary time acceleration. Spark recycles "executors" - the JVMs used to execute treatments – cleverly. We realized that some of the Hadoop latency was coming from the JVM’s startup. Recycling enabled to reduce the time processing of 20 seconds per iteration (in the Hadoop case, a container takes 30 seconds to initialize). The trick was already in Hadoop (the property mapred.job.reus.jvm.num.tasks in the Hadoop v1 configuration) but was not by default enabled.
With improvements in terms of latency by recycling JVMs and through the use of caches, we begin to see more and more interactive systems. Spark offers an interactive console (bin/spark-shell) to type and run code (Scala and Python languages).
It is a kind of iterative algorithm. SQL, as we have seen, has become an important point on Hadoop. The shortcut quickly made about Big Data technologies supporting SQL, will be to compare the latency with older systems which themselves used SQL (but processing small volumes): under an average of one minute to display the result of a query.
Business teams, analysts, BI engineers have access to all that power so they can “interact” with the data and extract value.
Hive – with initiatives such as Stinger and Stinger.next – are moving in this direction. It offers new runtimes environment, more suited to the “interactive” (Tez, Spark) to decrease processes on adhoc queries.
The challenge of these new analysis engine (Spark, Flink, Tez, Ignite) is able to store very large amounts of data in memory. We therefore increases the size of the JAVA Heap containers (Xmx the options ans Xms JVM). It will take more RAM. From 2 to 4 GB on average for MapReduce, it will grow to 8 or 16GB for Spark, Tez and Flink. For I/O disk, they will be reduced by increasing the Heap simply because we had the ability to store more objects in memory.
Interactive processing therefore lead to more substantial memory sizes to manage for the JVM, although many small objects are allocated (dataset records), frequently (with every new adhoc query).
We find ourselves in a new bottleneck: the garbage collector
The JVM‘s garbage collector is a very powerful mechanism, greatly facilitating the management JVM’s memory (Heap). It automates the deallocation of unused objects.
To manage its objects, it is a graph wherein a node represents an object, and an arc connected to another node corresponds to a reference (an object that references another). If an object has no arc connected to itself, it means it is no longer referenced by any other object, so it can be deallocated when the next collection occurs (it is "marked"). Logically, time travel of the objects collection is proportional to the number of existing objects in the Heap, and therefore the size of the Heap (the bigger it is, the more you can put in).
The garbage collector puts the whole system on hold in some cases (concurrence issue).
Here are some examples causing pause:
If you are interested to dig deeper the GC process, Martin Thompson gave a very good explanation.
In the diagram below, we see that breaks completely stop the system to complete their treatment, whether performed in parallel or not. It is also called STW pauses, standing for Stop The World.
Spark collections are often composed of millions or billions of records, represented by instances. All these fast allocations and deallocations are fragmenting a lot the memory.
The garbage collector is triggered at a frequency which varies depending on the use of the Heap. If it no longer offers enough memory to requests for allocation, the garbage collector will conduct a collection to free up space. If it is too fragmented, the JVM will perform a compaction following a collection.
These passages are sometimes too long for the interactive (imagine 30 seconds of GC pause). That is why the main current work on performance Spark is about the garbage collector. Daoyuan Wang's company Databricks (main contributor Spark) gives an overview of possible tweaks in a post titled “Tuning Java Garbage Collection for Spark Applications”.
Philippe Prados raises the question of the future of the GC in a previous article Blog OCTO: The next death of the garbage collector? "Today, the configuration has evolved enough to put the garbage collector on. Incremental developments, which are real, respond less and less to requests. GC are evolving, but not fast enough in relation to their environment.”
This is why engineers will start looking for solutions elsewhere…
In distributed data systems processing (using Java technology), we observe two variants to try to reduce the impact of garbage collection in the JVM.
Both variants use the same JDK class: the ByteBuffer A ByteBuffer is a type of object that encapsulates an array of bytes with access methods. The binary information is serialized in advance before storing it into the ByteBuffer. It is a kind of low-level container, in which we will store our collections of objects. According to the documentation, there are two types of ByteBuffer:
The GC is not put aside. It always manages de-serialized objects and temporary structures that the user allocates in its program (the developer who uses the Spark API).
Flink first implemented the first solution, e.g the use of Non-Direct ByteBuffer allocated in the Heap. The ByteBuffer are called MemorySegment, they are fixed to 32 KB in size and are managed by the MemoryManager. The purpose of this one is to distribute good operators segments (filter, join, sort, groupBy, etc ...).
Flink plans to migrate to the second method: the use of off-heap memory to further accelerate access (no copy in a temporary Buffer) and almost completely bypass the garbage collector (there is always the de-serialized object stored in the Heap).
With these new tricks (the ByteBuffer), serialization is the mechanism that becomes the most important thing. That is why it becomes important to develop serializers for each object type. Flink includes serializers "homemade", but the task is rather heavy (what about Java generics that converts any Object type?) And users classes are serialized from a serialization based on reflection (Kryo), though less powerful than the version "homemade".
Spark plans to catch up on Flink by launching the Tungsten Project, and for the latest version (1.4) incorporates an interesting mechanism to generate "custom-serializer" code. Code generation facilitates the life of the developer while being much more specific than using libraries such as Kryo.
They are only at the beginning but the project plans to incorporate a MemoryManager with memory pages in the manner of Flink in the version 1.5, and the appearance of efficient structures using the processor caches (L1, L2, L3).
ByteBuffers are very useful but they have a big flaw is that they are costly in terms of serialization / de-serialization (even with custom-serializer). Frameworks such as Flink and Spark spend their time performing these tasks to access their records. To remove the load of systematic de-serialization of objects in the operators, we are starting to see data structures called "cache aware". In other words, a structure able to efficiently use the shared cache of the processors.
The model is: Storing a contiguous collection of key + pointer in a ByteBuffer. When an operator (join, groupBy, sort, etc ...) performs the comparison between the records, it does it on the keys. It is therefore more interesting to separate the value from their value (often larger). Keys have a fixed size which makes the looking of collection effective without de-serialization (you remember the RDBMS at first, it's the same thing). We will do a binary comparison between the keys (or the first bytes of string keys type) and access the value only when it is needed. This access is made via the pointer near the key. Once the bytes retrieved from the ByteBuffer, we de-serializes into an object.
The following chart summarizes this explanation:
Why these structures are "cache aware"? In fact, these structures are accessed very frequently, and they use little memory (because they have only got keys and pointers), so the OS will place them in the shared cache of the processors (L1, L2 , L3).
By analyzing all this, I made the following reflection:
Seeing all these efforts to bypass the garbage collector, we are entitled to wonder why we use a platform whose main asset is to offer a managed memory, if it is to avoid using it?
In practice, the user using frameworks such as Flink and Spark has the best of both worlds. These frameworks limit the impact of GC for their internal mechanics, optimizing the management of large data sets, making them extremely powerful. But they allow developers to use a high-level language, abstracting them of memory management, which is a strong argument towards their utilisation.
Solve bottleneck problems, it is like starting over and over again (infinite loop). Even before Spark has incorporated these evolutions, we can already bet a coin on the fact that the shared cache of the processor will be the next bottleneck of big data systems. Quoting Carlos Bueno: Cache is the new RAM
The future will tell us how to get around it...
Thank you OCTO for their review.
Stay tuned, the Big Data Whitepaper OCTO is coming soon!