Tag Archives: Distributed Systems

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.

Is Java Fast Enough for Distributed Applications?

Lots of modern distributed systems are built with Java programming language, and consequently use Java Virtual Machine (JVM) as their execution environment. The list of such systems is rather large: Hadoop, Spark, HBase, Cassandra, Voldemort, ZooKeeper, BookKeeper, Kafka, and the list goes on and on. But is JVM fast enough for these systems?

Anyone who has ever dealt with Java probably knows at least a little bit about how JVM works. To start with, Java programs are compiled into a machine independent, un-optimized byte code. The byte code is then being interpreted by the JVM and compiled into the native code with the just-in-time (JIT) compiler. JVM adds various optimizations at the JIT compilation and these optimizations can be more aggressive than the optimizations done by a native compilers. After all, before doing these optimizations and compilation, Java has already ran the code in the interpreted mode, and it was able to collect some statistics on the branch predictions, loops and function calls to make optimization tailored not just to that specific code, but also to the specific runtime or data.

However, before Java performs all the tricks, it needs to run in a slow interpreted mode, incurring some warm-up overheads. OSDI’16 paper “Don’t Get Caught in the Cold, Warm-up Your JVM” goes into more details about what are the warm-up overheads and how they impact data-parallel distributed systems, such as Hadoop, Spark and Hive.

Warm-up Costs

The paper breaks down warm-up overheads into the two categories: class loading and bytecode interpretation overheads. It investigates these overheads under different workloads on different distributed systems. Of course it is expected for warm-up to impact the freshly started JVM, but how big is the cost of warm-up? If we look at the HDFS client performance, we can see the warm-up can easily take a few seconds, depending on your task. In HDFS, writing is more complicated and involves more classes, thus Java spends more time loading all the classes.  Warm-up while reading from HDFS also differs depending on whether we read in parallel or sequentially. The graph below shows warm-up costs by the task and dataset size.

Figure 1
JVM warm-up overheads. “cl” stands for class loading. “int” is bytecode interpretation.

We can see that the size of the operation has no impact on the overheads, meaning that small operations will spend much larger fraction of their time in warm-up, while big operations tend to amortize the warm-up costs.

figure2
Warm-up overhead as a fraction of total runtime.

It is also interesting to see when the warm-up occurs in the execution cycle. Obviously starting the client requires lots of class loading and interrupting the byte code, however starting actual jobs (for the first time?) also incurs warm-ups.

Figure 3
Warm-up at different execution stages.

Another question one may ask is how slow actual class loading is? For HDFS sequential read, client had to load about 2000 classes, taking 1028 ms to complete.  Spark was much heavier on the classes it uses and needs to load with 19,066 classes on average taking roughly 6.3 seconds in overheads.  These are rather large numbers, especially if we aim at low-latency execution of our requests, however not everything is so grim.

It is important to emphasize that the paper mainly uses clients to study the warm-up, while the actual distributed system is not being studied in much of the details. To be fair, authors mention that the warm-up overheads are present on the server side as well, and in Spark the executor warm-up can add up to almost 50% of the overall warm-up time.

figure4
Spark warm-up overheads.

Dealing with Warm-up

The paper argue that these are very big overheads that must be dealt with. Authors even offer a prototype solution, a modified JVM, called HotTub, which acts as a container for many other “normal” JVMs to be reused when needed. Reusing JVM means we do not need to load classes and perform JIT. Such approach works well for short lived JVMs, i.e we have a client performing one operation and terminating. If such terminated JVM ends up in the pool for JVM reuse, we can save time on overheads next time we need another short-lived JVM.

I have to disagree, however, that these overheads are a big problem, and here is why. JVM running server side of the distributed system are warmed-up if they ran for at least some time. As such, these machines do not experience warm-up costs anymore. In this breakdown of the HDFS request, we do not see any warm-up losses occurring on the data-node side and all of the overheads were due to the warm-up of a short-lived HDFS client. This means that keeping you JVM alive and designing your workloads/client to stay up is the best solution to overcome these type of overheads.

Figure 5
HDFS warm-up.

There are few lessons I have learned from this paper. They may sound like a common sense, but nevertheless these are important points to keep in the back of your head to get the most out of your Java code.

  • Keep JVMs alive. Long running JVMs do not incur as many new class loads and do not need to interpret as much code, allowing the JVM to be faster.
  • Simpler is better. Too Many classes hurt performance on the warm-up, however do not go to extreme on the other side too. After all these are warm-up costs and not constant penalty to your performance.
  • Watch your external libraries. This goes together with previous point. Bringing a big library to perform one small task may not be too wise if similar-performing alternatives are available.