Tag Archives: Distributed Systems

Reading Group. Log-structured Protocols in Delos

For the 87th DistSys paper, we looked at “Log-structured Protocols in Delos” by Mahesh Balakrishnan, Chen Shen, Ahmed Jafri, Suyog Mapara, David Geraghty, Jason Flinn Vidhya Venkat, Ivailo Nedelchev, Santosh Ghosh, Mihir Dharamshi, Jingming Liu, Filip Gruszczynski, Jun Li Rounak Tibrewal, Ali Zaveri, Rajeev Nagar, Ahmed Yossef, Francois Richard, Yee Jiun Song. The paper appeared at SOSP’21. This is a second Delos paper; the first one was “Virtual Consensus in Delos,” and we covered it in the 40th DistSys meeting.

The first Delos paper looked at building the replication/consensus layer to support strongly consistent use-cases on top. The system created a virtual log made from many log pieces or Loglets. Each Loglet is independent of other Loglets and may have a different configuration or even a different implementation. Needless to say, all these differences between Loglets are transparent to the applications relying on the virtual log. 

With the replication/consensus covered by virtual log and Loglets, this new paper focuses on creating modular architecture on top of the virtual log to support different replicated applications with different user requirements. In my mind, the idea of the Log-structured protocol expressed in the paper is all about using this universal log in the most modular and clean way possible. One can build a large application interacting with the log (reading and writing) without too many thoughts into the design and code reusability. After all, we have been building ad-hoc log-based systems all the time! On the Facebook scale, things are different — it is not ideal for every team/product to reinvent the wheel. Instead, having a smarter reusable architecture can go a long way in saving time and money to build better systems.

Anyway, imagine a system that largely communicates through the shared log. Such a system can create new log items or read the existing ones. In a sense, each log item is like a message delivered from one place to one or several other locations. With message transmission handled by the virtual log, the Delos application simply needs to handle encoding and decoding these “log-item-messages.”

Fortunately, we already have a good idea about encoding and decoding messages while providing services along the encoding/decoding path. Of course, I am thinking of common network stacks, such as TCP, or even more broadly, the OSI model. Delos operates in a very similar way, but also with a great focus on the reusability and composability of layers. When a client needs to write some data to the log, it can form its client-specific message and pass it down the stack. Lower layers can do some additional services, such as ensuring session guarantees or batching the messages. Each layer of the stack wraps the message it received with its own headers and information needed for decoding on the consumer side. Delos calls each layer in the stack an engine. The propagation down through layers continues until the message hits the lowest layer in the stack — the base engine. The job of the base engine is to interact with the log system.   

Similarly, when a system reads the message from the log, the log item travels up the stack through all of the same layers/engines, with each engine decoding it and ensuring the properties specific to that engine before passing it up. An imperfect real-world analogy for this process is sending a paper mail. First, you write the letter; this is the highest layer/engine close to the application. Then you put it in the envelope — now the letter is protected from others seeing it. Then goes the stamp — it is ready to be sent, then goes the mailbox — client batching, then post office — server-side batching, then transmission, and then the process starts to get undone from the bottom up. 

Of course, I oversimplified things a bit here, but such message encapsulation is a pretty straightforward abstraction to use. Delos uses it to implement custom Replicated State Machines (RSMs) with different functionality. Building these RSMs requires a bit more functionality than just pushing messages up and down the engines. Luckily, Delos provides a more extensive API with required features. For example, all engines have access to the shared local storage. Also, moving items up or down the stack is not done in a fire-and-forget manner, as responses can flow between engines to know when the call/request gets completed. Furthermore, it is possible to have more than one engine sitting at the same layer. For instance, one engine can be responsible for serializing the data and pushing it down, and another engine can receive the item from an engine below, deserialize and apply it to RSM. The figure illustrates these capabilities.  

This tiered modular approach allows Delos to reuse layers across applications or even compose some layers in different order. So when one application or use-case needs batching, that engineering team does not need to reinvent the batching from scratch. Instead, they can take an existing batching layer and add it to their stack in the appropriate location. The flexibility and reusability allowed Facebook engineers to implement two different control-plane databases with Delos. One datastore, called DelosTable, uses a table API, while another system, called Zelos, implements a ZooKeeper compatible service. 

I think I will stop my summary here. The paper goes into more detail about the history of the project and the rationale for making certain decisions. It also describes many practical aspects of using Delos. The main lesson I learned from this paper is about the overall modular layered design of large RSM-based systems. I think we all know the benefits but may often step aside from following a clean, modular design as the projects get bigger and pressure builds up to deliver faster. But then, what do I know about production-grade systems in academia? Nevertheless, I’d love to see a follow-up paper when more systems are built using Delos. 

As usual, we had our presentation. This time Micah Lerner delivered a concise but very nice explanation of the paper:


1) Architecture. This paper presents a clean and modular architecture. I do not think there is anything majorly new & novel here, so I view this paper more like an experience report on the benefits of good design at a large company. I think there is quite a bit of educational value in this paper. 

In the group, we also discussed the possibility of applying similar modular approaches to more traditional systems. For instance, we looked at MongoDB Raft in the group before. Nothing should preclude a similar design based on a Raft-provided log in a distributed database. In fact, similar benefits can be achieved — multiple client API, optional and configurable batching functionality, etc. That being said, for a system designed with one purpose, it is easy to start designing layers/modules that are more coupled/interconnected and dependent on each other. 

We had another similar observation in the group that mentioned a somewhat similarly designed internal application a while back, but again with a less clear separation between modules/layers.

2) Performance. A performance impact is a natural question to wonder about in such a layered/modular system. The paper spends a bit of time in the evaluation explaining and showing how layers and propagation between them add very little overheads. What is not clear is whether a less generic, more purpose-built solution could have been faster. This is a tough question to answer, as comparing different architectures is not trivial — sometimes it can be hard to tell whether the difference comes from design choices or implementation differences and nuances.

3) Read cost & Virtual Log. This part of the discussion goes back quite a bit to the first Delos paper. With a native Loglet, Delos assumes a quorum-based operation for Loglets, which may have less than ideal read performance. This is because NativeLoglet uses a sequencer to write, but requires on quorum read and waits for reads with the checkTail operation. So a client will read from the quorum, and assuming the Loglet is not sealed (i.e., closed for writes), the client must wait for its knowledge of the globalTail (i.e., globally committed slot) to catch up with the highest locally committed slot it observed earlier. This process is similar to a PQR read! Naturally, it may have higher read latency, which will largely depend on how fast the client’s knowledge of globally committed slot catches up. In the PQR paper, we also describe a few optimizations to cut down on latency, and I wonder if they can apply here. 

Moreover, a client does not need to perform an expensive operation for all reads — if a client is reading something in the past known to exist, it can use a cheaper readNext API, practically allowing a local read from its collocates LogServer. 

4) Engineering costs. This discussion stemmed from the performance discussion. While large companies care a lot about performance and efficiency (even a fraction of % of CPU usage means a lot of money!), the engineering costs matter a lot too. Not having to redo the same things for different products in different teams can translate into a lot of engineering savings! Not to mention this can allow engineers to focus on new features instead of re-writing the same things all over again. Another point is maintenance — cleaner and better-designed systems will likely be cheaper to maintain as well. 

5) Non-control plane applications? The paper talks about using Delos in two control-plane applications. These often have some more specific and unique requirements, such as zero external dependencies, and stronger consistency. The paper also mentions other possible control plane use cases, so it does not appear like Delos is done here. 

At the same time, we were wondering if/how/when Delos can be used outside of the control plane. For Facebook, there may not be too much need for strongly consistent replication for many of their user-facing apps. In fact, it seems like read-your-write consistency is enough for Facebook, so deploying Delos may not be needed. At the same time, user-facing apps can take on more dependencies and external dependencies, achieving some code reuse this way.

Another point made during the discussion is about making a more general and flexible replication framework that can support strongly-consistent cases and higher-throughput weaker consistency applications. We would not be surprised if Delos or its successors will one day support at least some stronger-consistency user-facing applications. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Conflict-free Replicated Data Types

We kicked off a new set of papers in the reading group with some fundamental reading – “Conflict-free Replicated Data Types.” Although not very old (and not the first one to suggest something similar to CRDTs), the paper we discussed presents a proper definition of Conflict-free Replicated Data Types (CRDTs) and the consistency framework around them. Needless to say, lots of research followed after this paper in the area of CRDTs. 

It is impossible to discuss Conflict-free Replicated Data Types without mentioning the consistency in distributed replicated systems a bit. In super high-level terms, consistency describes how well a replicated system mimics a single copy illusion of data. On one side of the consistency spectrum, we have strong consistency (i.e. linearizability), that appears to an outside observer as if there is exactly one copy of the data. On the other side of the spectrum, we have eventual consistency, which allows many kinds of data artifacts, such as reading uncommitted data, accessing stale data, and more. 

Strong consistency, however, comes at a significant performance cost that eventual systems do not have. For strong consistency, the order of operations is crucial, as clients must observe a single history of state changes in the system. Most often this means that all replicas must apply the same sequence of commands in the same order to progress through the same states of their state machines. This requires a sequencer node which often is a bottleneck. There are a few exceptions to this, for example, protocols like ABD do not need to have a single sequencer, and there may be even gaps/variations in history on individual nodes, but these protocols have other severe limitations. In addition to following the same history of operation, linearizability also imposes strict recency requirements — clients must observe the most recent state of the system. This prescribes synchronous replication to make sure enough nodes progress in lock-step for fault tolerance reasons. These challenges limit scalability — despite having multiple replicas, a replicated linearizable system will be slower than a single server it tries to mimic. 

Eventually-consistent systems do not have such strong performance constraints, because there is no need to order operations, enforce recency, and even keep a single history of updates. This gives a lot of freedom to explore parallelism and push the boundaries of performance. Unfortunately, these systems are hard to program against, since the application built on top of an eventual consistency store needs to account/anticipate all kinds of data artifacts and deal with them. 

All these differences between strong and eventual consistency also mean that they land on different sides (vertices?) of the CAP triangle. With the recency requirements and lock-step execution, linearizable systems are CP, meaning that they sacrifice the Availability in the face of network Partitions and remain Consistent. Eventual systems… well, they do not promise Consistency at all, so they remain Available.  

Anyway, the drastic differences between the two extremes of the consistency spectrum coupled with the scary CAP theorem have sparked a lot of research in consistency models that lie between strong and eventual. These intermediate models were supposed to provide a compromise between the safety of strongly consistent systems and the performance/availability of eventual ones. This is where CRDTs come to play, as they often drive the Strong Eventual Consistency (SEC) model. The paper presents SEC as the “solution to CAP”, and this makes me cringe a bit. First of all, Strong Eventual Consistency is a strongly confusing name. Secondly, having a solution to CAP sounds super definitive, whereas SEC is merely one of many compromises developed over the years. 

Now we are getting to the meat of the paper that excites me. See, aside from a cringy name and a claim to solve CAP problems, SEC is pretty clever. A big problem with eventual consistency is that it does not define any convergence rules. Without such rules, the system may converge to an arbitrary state. Moreover, the convergence itself becomes unpredictable and impossible to reason about. SEC addresses the convergence problem by imposing some rules to the eventual consistency model. This enables engineers to reason about both the intermediate and final states of the system.

More specifically, SEC calls that any two identical nodes applying the same set of operations will arrive at identical states. Recall, that this sounds similar to how strongly consistent systems apply operations at nodes. The difference is that in strongly consistent systems we reason about sequences that have some order to them. In SEC, we work with sets of commands, which are order-less. I think this is a pretty cool thing, to throw away the order, yet still, ensure that the convergence is predictable and dependable on operations we have.

Completely throwing away the operation ordering and working with operation sets instead of sequences is tricky though. Consider some variable x, initially at x:=2. If we have two operations: (1) x:=x+2 and (2) x:=x*2, we can clearly see the difference if these operations are applied in a different order — by doing the operation (2) first, we will get a final state of 6 instead of 8. This presents a convergence problem and a violation of SEC if different nodes apply these operations in a different order. In a sense, these two operations, if issued concurrently, conflict with each other and require ordering. So clearly we need to be smart to avoid such conflicts and make SEC work. 

There is no generic solution to avoid such conflicts, but we can design specific data structures, known as Conflict-free Replicated Data Types or CRDTs solve this ordering problem for some use cases. As the name suggests, CRDTs are built to avoid the conflicts between different updates or different versions of the same data object. In a sense, CRDTs provide a data structure for a specific use case with some defined and restricted set of operations. For instance, we can have a CRDT to implement a distributed counter that can only increment the counter’s value or a CRDT for an add-only set. The paper presents two broad types of CRDTs — state-based and operation-based CRDTs. Both types are meant for replicated systems and differ in terms of communicating the updates between nodes and reconstructing the final state. 

State-based CRDTs transfer the entire state of the object between nodes, so they can be a bit heavier on bandwidth usage. The actual state of a data structure is not directly visible/accessible to the user, as this state may be different than the logical meaning of the data structure. For example, going back to the counter CRDT, logically we have a single counter, but we may need to represent its value as consisting of multiple components in order to ensure conflict-free operation. Assume we have n nodes, and so to design a state-based counter CRDT we break down the counter value to registers <c1, c2, c3,…, cn>, each representing the increments recorded at a particular node. The logical state of CRDT counter is the sum of all registers \(c=\sum_{i=1}^nc_i\), which must be exposed through a query function. In addition to the query function, there must be an update function to properly change the underlying state. For the counter, the update function will increment the register corresponding to its node id. 

The most important part, however, is still missing. If some node receives concurrent increments, how can it reconcile them? Let’s say we have 3 nodes {n1, n2, n3}, each starting in some initial counter state <4,5,2>. These nodes receive some updates and increment their respective registers locally: n1:<6,5,2>, n2:<4,6,2>, n3:<4,5,4>. The nodes then send out their now divergent copies of the counter CRDT to each other. Let’s say node n2 received an update from n3, and now it needs to merge two versions of CRDT together. It does so with the help of the merge function, which merges the two copies and essentially enforces the convergence rules of SEC. There are some specific requirements regarding the merge function, but they essentially boil down to making sure that the order in which any two CRDTs merge does not matter at all. In the case of our counter, the merge function can be as simple as a pairwise comparison of registers between two versions of CRDT and picking the maximum value for each register. So for merge(n2:<4,6,2>, n3:<4,5,4>), we will see the updated value of <4,6,4> on node n2. If at this point n2 sends its update to n1, then n1 will have to do merge(n1:<6,5,2>,n2:<4,6,4>), and get the final version of <6,6,4>. Note that if n1 now also receives n3‘s update, it will not change the state of n1, since that update was already learned indirectly. This scheme works pretty neatly. It tolerates duplicate messages and receiving stale updates. However, we can also see some problems — we carry a lot more state than just a simple integer to represent the counter, and our counter’s merge function has restricted the counter to only allow increments. If we try to decrement a value at some node, it will be ignored, since the merge function selects the max value of a register. The latter problem can be fixed, but this will require essentially doubling the number of registers we keep for each node, exacerbating the state-size problem. 

Example of state-based counter with some sample message exchange.

Operation-based CRDTs somewhat solve the above problems. Instead of transferring the full state of the object, op-based CRDTs move around the operations required to transform from one state to the next. This can be very economical, as operations may use significantly less bandwidth or space than the full CRDT state. For instance, in our counter CRDT example, the operation may be the addition of a number to the counter. Of course, as the operation may propagate at different speeds, and potentially get reordered, op-based CRDT requires that all concurrent operations are commutative. In other words, we again set the rules to ensure that the order of updates (i.e. operations) does not matter. In the counter use-case, we know that all additions commute, making it easy to implement an op-based counter. Unlike the state-based version, we do not even need to have multiple registers and sum them up to get the actual value of the counter. However, there are some important caveats with op-based CRDTs. They are susceptible to problems when a message or operation gets duplicated or resent multiple times. This creates a significant challenge, as either the operations themselves must be designed to be idempotent, or the operation delivery layer (communication component of the application) must be able to detect duplicates and remove them, essentially ensuring idempotence as well.

The paper goes into more details and more examples of each type of CRDT, as well as explaining how the two types are roughly equivalent in terms of their expressivity. Intuitively, one can think of the merge function as calculating a diff between two CRDT versions and applying it to one of the versions. Operations are like these diffs to start with, so it makes sense how the two types can be brought together. We have our presentation of the paper available here:


1) Challenges designing CRDTs. As mentioned in the summary, CRDTs are special-purpose data structures, so designing them to fit a use case takes some time. I have spent some time a few years ago working on CRDTs at Cosmos DB, and it was a very fun thing to do, but also a bit challenging. A good example of the problem is a set CRDT. It is easy to make an add-only set, where items can be added and not removed. All set additions commute, so the problem is trivial. But to make sets more practically, we want to remove items too. A simple solution is to internally implement a removed set, so CRDT tracks all items added and removed separately. This way we can hardcode the precedence of adds and removes and say removes always come after ads for an item. But this works only as long as we do not ever need to re-add items back into the set…

2) Modeling. Due to their concurrency nature, it is a good idea to model and model-check CRDTs. I used TLA+ for this purpose. During the discussion, a question was raised on the best tools for CRDT model-checking, but unfortunately, nobody knew anything better than TLA+/TLC. I’d suspect that other tools used for verifying distributed systems, such as Alloy, could work as well.

3) Applications. Quite a bit of discussion was focused on applications that use CRDTs. we talked quite a bit about near-real-time collaborative tools, such as collaborative document editing. I mentioned the Google Docs style of application quite a few times, but it was brought up that Google actually uses Operational Transformation (OT) instead of CRDT. In particular, server-based OT, which requires a server to sync each client against. Regardless, collaborative tools seem to be the prime field for CRDTs. For instance, the Automerge library provides a good start for JSON-like CRDT to serve as the basis for these types of applications

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Planetary-Scale Systems Seminar Spring 2021

This spring semester I am teaching an exciting seminar class: “Planetary-Scale Systems.” I will start the seminar with a 4 lectures long crash course to get my students on the same page, but the bulk of the class will be paper presentations and discussions. The format is similar to the zoom reading group I am running.

The class meets twice a week, on Tuesdays and Thursdays. On day one, we will have a paper presentation, followed by a class discussion. On day two we take the discussion up a notch and dive deeper into 2-3 select topics/questions from day one. The time should also allow students to prepare for the in-depth discussion.

Speaking of preparing, all students should read the paper and ask questions before day one to help drive the discussion. Similar applies to the in-depth discussion, as each student is expected to contribute to the discussion. 

Overall, we will cover 11 papers, roughly broken down into 3 groups. The papers are as follow:

Planetary Scale Storage

Planetary-Scale Analytics & ML


One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.

Is Java Fast Enough for Distributed Applications?

Lots of modern distributed systems are built with Java programming language, and consequently use Java Virtual Machine (JVM) as their execution environment. The list of such systems is rather large: Hadoop, Spark, HBase, Cassandra, Voldemort, ZooKeeper, BookKeeper, Kafka, and the list goes on and on. But is JVM fast enough for these systems?

Anyone who has ever dealt with Java probably knows at least a little bit about how JVM works. To start with, Java programs are compiled into a machine independent, un-optimized byte code. The byte code is then being interpreted by the JVM and compiled into the native code with the just-in-time (JIT) compiler. JVM adds various optimizations at the JIT compilation and these optimizations can be more aggressive than the optimizations done by a native compilers. After all, before doing these optimizations and compilation, Java has already ran the code in the interpreted mode, and it was able to collect some statistics on the branch predictions, loops and function calls to make optimization tailored not just to that specific code, but also to the specific runtime or data.

However, before Java performs all the tricks, it needs to run in a slow interpreted mode, incurring some warm-up overheads. OSDI’16 paper “Don’t Get Caught in the Cold, Warm-up Your JVM” goes into more details about what are the warm-up overheads and how they impact data-parallel distributed systems, such as Hadoop, Spark and Hive.

Warm-up Costs

The paper breaks down warm-up overheads into the two categories: class loading and bytecode interpretation overheads. It investigates these overheads under different workloads on different distributed systems. Of course it is expected for warm-up to impact the freshly started JVM, but how big is the cost of warm-up? If we look at the HDFS client performance, we can see the warm-up can easily take a few seconds, depending on your task. In HDFS, writing is more complicated and involves more classes, thus Java spends more time loading all the classes.  Warm-up while reading from HDFS also differs depending on whether we read in parallel or sequentially. The graph below shows warm-up costs by the task and dataset size.

Figure 1
JVM warm-up overheads. “cl” stands for class loading. “int” is bytecode interpretation.

We can see that the size of the operation has no impact on the overheads, meaning that small operations will spend much larger fraction of their time in warm-up, while big operations tend to amortize the warm-up costs.

Warm-up overhead as a fraction of total runtime.

It is also interesting to see when the warm-up occurs in the execution cycle. Obviously starting the client requires lots of class loading and interrupting the byte code, however starting actual jobs (for the first time?) also incurs warm-ups.

Figure 3
Warm-up at different execution stages.

Another question one may ask is how slow actual class loading is? For HDFS sequential read, client had to load about 2000 classes, taking 1028 ms to complete.  Spark was much heavier on the classes it uses and needs to load with 19,066 classes on average taking roughly 6.3 seconds in overheads.  These are rather large numbers, especially if we aim at low-latency execution of our requests, however not everything is so grim.

It is important to emphasize that the paper mainly uses clients to study the warm-up, while the actual distributed system is not being studied in much of the details. To be fair, authors mention that the warm-up overheads are present on the server side as well, and in Spark the executor warm-up can add up to almost 50% of the overall warm-up time.

Spark warm-up overheads.

Dealing with Warm-up

The paper argue that these are very big overheads that must be dealt with. Authors even offer a prototype solution, a modified JVM, called HotTub, which acts as a container for many other “normal” JVMs to be reused when needed. Reusing JVM means we do not need to load classes and perform JIT. Such approach works well for short lived JVMs, i.e we have a client performing one operation and terminating. If such terminated JVM ends up in the pool for JVM reuse, we can save time on overheads next time we need another short-lived JVM.

I have to disagree, however, that these overheads are a big problem, and here is why. JVM running server side of the distributed system are warmed-up if they ran for at least some time. As such, these machines do not experience warm-up costs anymore. In this breakdown of the HDFS request, we do not see any warm-up losses occurring on the data-node side and all of the overheads were due to the warm-up of a short-lived HDFS client. This means that keeping you JVM alive and designing your workloads/client to stay up is the best solution to overcome these type of overheads.

Figure 5
HDFS warm-up.

There are few lessons I have learned from this paper. They may sound like a common sense, but nevertheless these are important points to keep in the back of your head to get the most out of your Java code.

  • Keep JVMs alive. Long running JVMs do not incur as many new class loads and do not need to interpret as much code, allowing the JVM to be faster.
  • Simpler is better. Too Many classes hurt performance on the warm-up, however do not go to extreme on the other side too. After all these are warm-up costs and not constant penalty to your performance.
  • Watch your external libraries. This goes together with previous point. Bringing a big library to perform one small task may not be too wise if similar-performing alternatives are available.