Sonification of Distributed Systems with RQL

In the past, I have discussed sonification as a mean of representing monitoring data. Aside from some silly and toy examples, sonifications can be used for serious applications. In many monitoring cases, the presence of some phenomena is more important than the details about it. In such situations, simple sonification is a perfect way to alert users about the occurrence of such phenomena.  For example, Geiger counter alerts users of the radioactive decay, without providing any further details.

Our recent work on Retroscope and RQL allows us to bring sonification to the same “phenomena-awareness” plane for distributed systems. With RQL we can search for interesting conditions happening globally in the system, and we can use sounds to alert the engineers when certain predicates occur in the global state. Of course, for such system to be useful, it needs to be real-time, as the recency of events is the most important attribute of this type of sonification. For instance, users of a Geiger counter are not interested in hearing ticks for decay events happening a minute ago, as it rather defeats the purpose of the counter. Similar requirements apply to the “phenomena-awareness” tools in the distributed systems domain, as engineers need to know what happens in the system instantaneously.

RQL can easily allow inspecting past states, however it lacks when it comes to the current state inspection.  What we need is a streaming query that can continuously receive individual logs from Retroscope log-servers, and perform the search as soon as sufficient data become available to form a consistent cut. Of course, streaming queries will still be lagging behind the current time, however we can make this lag small enough to be virtually unobservable by a person.

With streaming queries we can not only receive the cuts meeting some search criteria in near real time, but we can also sonify the mere fact such cuts have been found. If we look at the previous example of ZooKeeper staleness, we can run sonified streaming queries to have the system alert us when the data staleness reaches some threshold, such as two or more versions stale.  In the audio clip below, we have used MIDI to sonify the stream of staleness events occurring in a ZooKeeper cluster. Multiple queries were sonified to produce different sounds for different staleness of ZooKeeper data.

We can hear periods of silence when the staleness is below the threshold of 2 values, however we can also observe some variations in cluster performance. For instance, it is very easy to identify when the cluster was experiencing more problems than normal. It is also easy to hear it recovering from the spike in staleness soon after.

Retroscoping Zookeeper Staleness

ZooKeeper is a popular coordination service used as part of many large scale distributed systems. ZooKeeper provides a file-system inspired abstraction to the users on top of its replicated key-value store. Like other Paxos-inspired protocols, ZooKeeper is typically deployed on at least 3 nodes, and can tolerate F node failure for a cluster of size 2F+1. One characteristic of ZooKeeper is that it runs the consensus algorithm for update operations, however to speed things up reads may be served locally by the replica a client connects to. This means that a value read from a replica may be stale or outdated compared to the leader nodes or even other followers. This makes it a user’s problem to tolerate the stale data read from ZooKeeper, or perform sync operation to get up-to-date with the leader before reading.

But how stale can data become in a ZooKeeper cluster? This is a rather tricky question to answer, since each replica runs on a separate machine with different clock. We cannot just observe the time a value become available at each node, because these timestamp are not comparable due to clock skew.  Instead of trying to figure out the staleness time, I decided to look at how many versions behind can a replica be. I used my Retroscope tool to keep track of when the data becomes available to the client at each replica. I used Retroscope Query Language (RQL) to collect the data from nodes and look at the consistent cuts progressing through the states of ZooKeeper. Retroscoping ZooKeeper took only about 30 lines of code to be added to the project.

f1
Initial run. 1 version staleness is expected.

I deployed a small ZooKeeper cluster on 3 AWS t2.micro instances (I know, it is far from production setup, but it works well for a quick test). On a separate instance I deployed RQL server. To start, I simply created one znode and updated its value. I then proceeded with running the simplest possible RQL query: SELECT retro FROM setd; in this query, retro corresponds to /retro znode I’ve created, and setd is the name of the Retroscope log I put my monitoring data into. The result of the query was exactly what I have expected: at one of the consistent cuts one of the nodes had a value one version behind. This is entirely normal behavior, as the value needs to propagate from the leader to the followers once decided.

My next move was to give a bit more work to the ZooKeeper in a short burst, so I quickly wrote a small program that puts some incremental values to n znodes for a total of r writes. It starts with a value tst1, then goes to tst2 and so on for every znode.  At first I restricted n=1, as I felt that writing to just one znode to create a “hot-spot” was going to give me the best chance of getting stale values. But ZooKeeper handled the burst of 100 writes easily, with the results being identical to a single write: stale value was at most 1 version behind the current data.

f2
Two versions behind.

Seeing things work well was not interesting for me, so I decided to no longer play fair. I artificially made one replica to work slower and be a struggler node. ZooKeeper protocol tolerates this just fine, as it needs 2 out of 3 nodes in my cluster to form the majority quorum and make progress. For this crippled ZooKeeper setup I re-ran the workload and my simple query. Needless to say, I was able to spot one time a system had a node with a znode 2 versions behind, and it was my struggler machine. In the next step I increased the load on the cluster and made the workload write 1000 values to the same znode. I also changed the query so that I do not have to manually look through thousands of consistent cuts trying to spot the stale data. My new query emits consistent cuts with staleness of 2 or more versions:

SELECT r1 FROM setd WHEN Int(StrReplace(r1, “tst”, “”)) – Int(StrReplace(r1, “tst”, “”)) > 1;

The interesting thing about the query above is that it uses the same variable name twice in the expression, essentially telling RQL to output a cut when r1 – r1 > 1. However, there are are many r1‘s in the system (3 in fact, one at each node), so when a pair of r1‘s that satisfy WHEN condition is found (at different nodes), RQL will output the consistent cut.

f3
Very stale.

I was a bit surprised by the results. At first the struggler managed to keep-up with the rest of the cluster quite well, slipping 2 version behind on occasion, but after about 200 requests things went out of control for the crippled node, with the staleness growing to be 158 version behind towards the end of the run. Of course a struggler node will make things look worse, but it is not an unrealistic scenario to have underperforming machines. My test however is not fair either, as I was using a 100% write workload targeting just one znode. So In the next try I changed the workload to target 100 different znodes, while still measuring the staleness on just one znode. In that experiment, the  staleness was not that high, but the number of updates to a single znode was only a small fraction of the previous test. Nevertheless, struggler replica was as much as 6 version stale, making it roughly 1/3rd of updates behind the rest of the cluster.

For the last quick test, I tried doing a large burst of 2000 writes to the same znode on a healthy cluster with no struggler nodes. Despite all replicas working at their proper speed, I was able to observe staleness of 3 versions on some occasions.

I am not sure about the lessons learned from this quick experiment. I was amazed by how easy it is to get ZooKeeper to have data 2 or more versions behind, although the system seems fast to catch up. Struggler scenario, however, illustrated how quickly things can get out of control with just some performance degradation at one of the replicas.  Engineers using ZooKeeper must build their applications in such a way to tolerate the stale ZooKeeper values gracefully.

Why Government IT is Expensive and Archaic

Disclaimer: I do not work for the government, and my rant below is based on my very limited exposure to how IT works at the US government setting.

Why Government IT is Expensive and Archaic? I think, this can be a very long discussion, but I do have a quick answer:  standards imposed by government and used by the government regulated industries. I have a very little experience with these types of standards, but they make me cringe every time I have to deal with them. Bellow I briefly describe my encounter with them.

When I was just a college student, I joined a (very) small IT company sitting next to the University campus. I started just as an intern during my 3rd year of college, and I was working fulltime a year later. At the same time, our team was tasked with making a piece of software for a private company to be used for tracking medical services provided to students at public schools throughout the state. This piece of software was to replace an older one, and we had a very strict set of requirements: “Make it work and look the same but better”.  Such requirements, along with many decisions a college-student-turned-software-engineer had to make, shaped how the system is working right now. Of course, it was not just me making the product, but nevertheless, my “brilliant” ideas slipped in and became what the system is today.

One part if the system is responsible for billing the medical services tracked in the system to Medicaid. And this is where the interactions with government IT has started. Government agency cannot just provide a secure API for the software developers like myself to use. No, it has to hire a major business to implement a standard commonly used for Medical transaction. And here we are, in the 21st having to adhere to the Electronic Data Interchange (EDI) X12, a standard designed in the 1970s to transfer medical (among other EDI uses) data between computer systems. Don’t get me wrong, standards are good, they make different systems work together flawlessly… But that is until you start looking at the standard. For the system, we had to implement only one transaction type at that moment, so what can be difficult about it? The difficulty started with a 700 page manual just for constructing the transaction request. The manual is accompanied by an errata and an errata of errata. In addition, there is a 100 page companion guide that specifies procedures specific for the state Medicaid.

So how easy is the standard itself? Maybe it is not that complicated to work with and the manuals are just full of fluff? Well, EDI is a textual format, so in theory a person can read the file to see all the data. Manuals even provide the example of how pieces of it should look like: SV1HC:99211:2512.25UN111✽✽1:2:3✽✽Y~

Easy enough to read? Nope, so you are back at the manual studying what everything means. For example, SV1 is the header for section describing the professional (medical) service provided, HC code describes what type of codes to follow next, 99211 is the service code followed by code modifiers and modifiers of modifiers and so on. Somewhere in there is how much to charge Medicaid and how many units of service have been provided. But on the bright side, it has cool delimiters: stars, colons and tildes. To top it off, each field can be flexible in size or can be restricted to some number of characters or to a certain set of values, and to find this you consult the manual once again.

What if you make an error? Not a problem, the response to the request comes back as an EDI file, equally cryptic, that describes what went wrong. And we are back to the manuals, counting stars and tildes to check if you send all the data and whether it was in the right format and right order.

It is also worth mentioning that EDI format is used for HIPAA protected medical data, but it has no security built in, everything is plaintext and with the help of manual anyone can read it.  The transmission of the requests, however, is carried out over a secure channel. In my case, I am sending batched requests for processing and the only way to transmit those is by using an SFTP. Needless to say it also becomes my responsibility to pull the response files from the SFTP server once requests are processed, and since there is no strict guarantees on when the processing is completed for each batch, I just do it periodically and eventually collect the responses. How the EDI files are stored on the system side is all up to the engineers designing such system, and as users we can only hope it has passed HIPAA compliance checks and secures the data at rest.

Obviously the archaic standards, like EDI, work well in practice. After all, despite my “brilliant” ideas implemented in other parts of the system, the EDI layer was relatively problem-free. But the standards definitely can be better and new standards, if developed, can provide improved security,  they can be easier to work with, and they will reduce the costs of developing new software. However, I suspect the costs of switching to new standards are just too high for the existing infrastructure, forcing the industry to create a bigger and bigger gap between what government agencies require and what is modern, efficient and secure.

I no longer work for the company, but I still maintain the software. And I was dreading the time a client asks for changes to how the system interacts with Medicaid. It seems like this time is upon me now, and in the summer I will be looking at more manuals for EDI X12 requests needed to implement new features.

The First Datastore-driven Vehicle

vold-carIt is not a secret that procrastination is the favorite activity of most PhD students. I have been procrastinating today, even though my advisor probably wants me to keep writing.  In the midst of my procrastination, I thought: “Why are there self-driving vehicles, but no database-driven vehicles?” As absurd as it sounds, I gave it a try. And now I not so proudly present the prototype of the world’s first(?) datastore-driven vehicle. The implementation was quick and simple. I took the Voldemort code I have been tinkering with in my Voldemort traffic light post, and changed it to send the signals over Bluetooth connection to a Frankenstein-of-a-car I’ve built very quickly out a few motors, a gearbox, an Arduino, a bunch of wires, some tracks, and lots of thermal glue.

The car changes the direction of travel when the workload characteristics change: it will go straight when number of read operations is the same as number of write operation.  By changing the read-to-write ratio of the workload, the car will turn either left or right. The prototype vehicle is not very responsive (weak motors/dead batteries) and sometimes runs into things despite my best attempts to control it.

And now, once the world has a database-driven car, the procrastination can continue… Or should I start writing?

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: “Slicer: Auto-Sharding for Datacenter Applications”

One of the questions engineers of large distributed system must answer is “where to compute”. This is a big and important question, as we do not want to send a request originating in the US to some server in Australia. It simply makes no sense to incur the communication overhead if there are resources available closer. But physical affinity is not the only requirement to answer the question. Just as we would not make an Olympic swimming competition in a desert simply because a desert is close, we would not send a Russian language speech recognition request to the servers with English language model. In other words, when deciding on the placement of a computation, we need to find the closest place with enough resources of the right type.

Slicer is a Google’s answer to this “where” problem. It continuously tries to find the best possible place to perform the computations in the presence of dynamically changing workload. Many applications within Google, such as speech recognition, various caches and DNS service use Slicer to partition and balance their workload among a set of tasks, or application servers (containers). Computation placement is not on the application critical path, and it is somewhat static: once it has been made, we can use it for some time until the conditions change. Think of finding a swimming pool for our Olympics competition ahead of the actual races and sticking with the same pool until the games are over.

Slicer sticks with the same pool of resources as well, as it directs similar requests to the same set of tasks. Each application using Slicer needs to produce a key associated with the request and Slicer hashes it into a 63-bit internal key, which is then used to decide on the placement of the computation. This makes it possible for requests with the same key to be processed by the same set of tasks, catering to locality of reference.

Clerk library provides a client interface to pull and learn which keys are assigned to which tasks. With Clerk, clients can learn the assignment and communicate directly with tasks. Google’s RPC service Stuby is also integrated with Slicer and can provide Clerk’s functionality, so many applications do not even need to use Clerk. Application servers use a similar library, called Slicelet, to learn which keys that particular task is expected to process, allowing tasks to adapt for the change in the requests they serve.

Slicer service is divided in two major parts: Assigners and Distributors. Assigners map keys to tasks, while Distributors retrieve these maps from Assigners and deliver them to the Clerks and Slicelets upon request. More than one Assigner exists globally, and any assigner can generate the key assignment for any application, however, in most cases only one assigner is considered to be preferred. Sometimes more than one Assigner can act as preferred for short duration, but Assigners eventually converge to the same assignment through the optimistically-consistent storage. Slicer also has a mode with strongly consistent assignment, however it has not been tested on production at the time of paper publication. Slicer also packs a variety of backup mechanisms to make sure clients can still make the requests despite Assigner or Distributor failures. In case of miss-assignment (~0.02% of requests), for example due to failed tasks, client will need to try the request again with the new assignment obtained from Slicer.

Assigners perform load balancing for the requests by changing how many tasks are assigned to each key. Balancing is done taking into account a usage rate of different keys. As the load changes, Slicer updates it’s mapping by taking some servers off the lesser used keys and putting them for the use by “heavy” keys. Load balancing results in lesser load on the “hottest” tasks of an application (median job’s hottest task is 63% less loaded), compared to static request distribution. At the same time, Slicer reacts to shifting workload patterns in the matter of minutes.

Slicer hottest task load.
Hottest task load under Static sharding and Slicer. Slicer gets 63% lower hottest task load for a median application.

Slicer seems to be playing a big role in deciding where to process requests at Google, at least for certain applications. It provides load balancing and ensures the requests get to servers or tasks that are both prepared to accept them and have capacity to do so.

Monitoring with Retroscope: Detecting Invariant Violations

Earlier I briefly mentioned Retroscope, our distributed snapshot library that makes taking non-blocking, unplanned consistent global distributed snapshots possible. However, these snapshots are only good if we know how to use them well. Of course the most obvious use case is just a data backup, and despite it being an important application for snapshots, I feel it being a bit boring to my taste. What I am thinking right now is using snapshots for distributed monitoring and debugging.

Let’s consider an application that has a global invariant predicate P, and we want to check if a distributed system holds the invariant P at all times. This means that we should never see a consistent cut in which predicate P = false. So our problem is boiling down to looking for consistent cuts that violate P. Luckily, Retroscope can do exactly this, since we can take one snapshots and incrementally move forward in time as the application execution progresses, checking the predicates by looking at consistent cuts as the state advances.

With the basic Retroscope described in the earlier post, finding predicate violations is a rather cumbersome effort that requires writing new code for every invariant a user wishes to check. So in the past few months I have been working on Retroscope extension tailored specifically for debugging and monitoring use cases.  Improved Retroscope exposes the Retroscope Query Language (RQL), a SQL-like interface to allow users write queries to search for conditions happening in the consistent cuts.

Now let’s go back to our hypothetical system with global invariant P and for now assume P holds when all local predicates p0, p1, p2, …, pn hold on the nodes [0 … n]. As such, P = p0  p1  p2  …  pn, and if any of the local predicates fail, the global predicate fails as well. For the simplicity of the example, we can say that local predicate pi is following: pi = ai + bi > ci. This makes each node maintain all three variables, although the nodes may have different values. With Retroscope, we can expose these local variables to be stored in the local log named inv. The log will maintain both the current version of the variables and the history of variable changes.

How do we look for the violation of such invariant with RQL? Just a single query would suffice for us:

SELECT inv.a, inv.b, inv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK SAME_NODE

Now we can dissect this query into bits and see what happens there. RQL queries are meant to retrieve consistent cuts that satisfy certain criteria. The list of parameters following the SELECT statement specifies what variables we want to see in the resultant consistent cuts. FROM keyword enumerate the logs we use in this particular query. The actual consistent cut criteria are specified after the WHEN keyword. In the particular case the condition for emitting cuts is (inv.a + inv.b <= inv.c) LINK SAME_NODE, which is equivalent to emitting cuts when the following holds:f1

By now a curios reader would have probably asked a question of why we even bother with consistent cut in this particular example. All predicates can be checked locally and their evaluation does not depend on other remote servers, so we can simply run local monitors and do not worry about consistent cuts and time synchronization at all: failure on one node designate the failure of the system globally no matter the time. Retroscope and RQL shines when we break away from this locality. What if our invariant involves messages being sent and received? Or what if in involves different parameters that exists on different machines at the same time? With the ability of looking at consistent cuts, RQL breaches the boundary of a single node. Below I list just a few variations of the original query that no longer deal with conjunction of local predicates and look at global state as a whole:

  1. SELECT inv.a, inv.b, anv.c FROM inv WHEN inv.a + inv.b <= inv.c
    • Omitting LINK SAME_NODE part changes the operation of the query drastically, as all three variables are no longer bound to co-exist on the same node:f2
    • rql_predicates1
  2. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK EACH_NODE
    • Replacing LINK SAME_NODE with LINK EACH_NODE, changes the search condition to require every node satisfying it in the consistent cut:f3
    • rql_predicates2
  3. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c
    • Rewriting the condition to WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c will cause the inv.a and inv.b to be summed on the same nodes, and compared to inv.c values from other nodes as well, so the consistent cut is emitted when f4
    • rql_predicates3
  4. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) AND NODE($1) = NODE($3)
    • This query restricts inv.a and inv.c to be on the same node. $1 is the placeholder for the first variable encountered while parsing left to right, and $3 is the third variable. This emits the consistent cuts when f5
    • rql_predicates4

Above are just a few simple examples of what is possible with RQL, however there are limitations. The biggest limitation is the complexity of the conditions. Even though RQL does not limit how many operations are possible in the condition of the query, having large expressions can slow the system down drastically. For example, a simple WHEN inv.a > inv.b will examine all a’s that exist on the nodes of the system at the consistent cut and all b’s in every possible combination. For f6 . Comparison is then carried out on every element of product set E.

P.S. I illustrated some of the syntax as it operates at the time of this writing, however RQL is developing, and I am not sure I like syntax of conditions too much, so it is a subject to change.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.

Is Java Fast Enough for Distributed Applications?

Lots of modern distributed systems are built with Java programming language, and consequently use Java Virtual Machine (JVM) as their execution environment. The list of such systems is rather large: Hadoop, Spark, HBase, Cassandra, Voldemort, ZooKeeper, BookKeeper, Kafka, and the list goes on and on. But is JVM fast enough for these systems?

Anyone who has ever dealt with Java probably knows at least a little bit about how JVM works. To start with, Java programs are compiled into a machine independent, un-optimized byte code. The byte code is then being interpreted by the JVM and compiled into the native code with the just-in-time (JIT) compiler. JVM adds various optimizations at the JIT compilation and these optimizations can be more aggressive than the optimizations done by a native compilers. After all, before doing these optimizations and compilation, Java has already ran the code in the interpreted mode, and it was able to collect some statistics on the branch predictions, loops and function calls to make optimization tailored not just to that specific code, but also to the specific runtime or data.

However, before Java performs all the tricks, it needs to run in a slow interpreted mode, incurring some warm-up overheads. OSDI’16 paper “Don’t Get Caught in the Cold, Warm-up Your JVM” goes into more details about what are the warm-up overheads and how they impact data-parallel distributed systems, such as Hadoop, Spark and Hive.

Warm-up Costs

The paper breaks down warm-up overheads into the two categories: class loading and bytecode interpretation overheads. It investigates these overheads under different workloads on different distributed systems. Of course it is expected for warm-up to impact the freshly started JVM, but how big is the cost of warm-up? If we look at the HDFS client performance, we can see the warm-up can easily take a few seconds, depending on your task. In HDFS, writing is more complicated and involves more classes, thus Java spends more time loading all the classes.  Warm-up while reading from HDFS also differs depending on whether we read in parallel or sequentially. The graph below shows warm-up costs by the task and dataset size.

Figure 1
JVM warm-up overheads. “cl” stands for class loading. “int” is bytecode interpretation.

We can see that the size of the operation has no impact on the overheads, meaning that small operations will spend much larger fraction of their time in warm-up, while big operations tend to amortize the warm-up costs.

figure2
Warm-up overhead as a fraction of total runtime.

It is also interesting to see when the warm-up occurs in the execution cycle. Obviously starting the client requires lots of class loading and interrupting the byte code, however starting actual jobs (for the first time?) also incurs warm-ups.

Figure 3
Warm-up at different execution stages.

Another question one may ask is how slow actual class loading is? For HDFS sequential read, client had to load about 2000 classes, taking 1028 ms to complete.  Spark was much heavier on the classes it uses and needs to load with 19,066 classes on average taking roughly 6.3 seconds in overheads.  These are rather large numbers, especially if we aim at low-latency execution of our requests, however not everything is so grim.

It is important to emphasize that the paper mainly uses clients to study the warm-up, while the actual distributed system is not being studied in much of the details. To be fair, authors mention that the warm-up overheads are present on the server side as well, and in Spark the executor warm-up can add up to almost 50% of the overall warm-up time.

figure4
Spark warm-up overheads.

Dealing with Warm-up

The paper argue that these are very big overheads that must be dealt with. Authors even offer a prototype solution, a modified JVM, called HotTub, which acts as a container for many other “normal” JVMs to be reused when needed. Reusing JVM means we do not need to load classes and perform JIT. Such approach works well for short lived JVMs, i.e we have a client performing one operation and terminating. If such terminated JVM ends up in the pool for JVM reuse, we can save time on overheads next time we need another short-lived JVM.

I have to disagree, however, that these overheads are a big problem, and here is why. JVM running server side of the distributed system are warmed-up if they ran for at least some time. As such, these machines do not experience warm-up costs anymore. In this breakdown of the HDFS request, we do not see any warm-up losses occurring on the data-node side and all of the overheads were due to the warm-up of a short-lived HDFS client. This means that keeping you JVM alive and designing your workloads/client to stay up is the best solution to overcome these type of overheads.

Figure 5
HDFS warm-up.

There are few lessons I have learned from this paper. They may sound like a common sense, but nevertheless these are important points to keep in the back of your head to get the most out of your Java code.

  • Keep JVMs alive. Long running JVMs do not incur as many new class loads and do not need to interpret as much code, allowing the JVM to be faster.
  • Simpler is better. Too Many classes hurt performance on the warm-up, however do not go to extreme on the other side too. After all these are warm-up costs and not constant penalty to your performance.
  • Watch your external libraries. This goes together with previous point. Bringing a big library to perform one small task may not be too wise if similar-performing alternatives are available.

Globally Consistent Distributed Snapshots with Retroscope

Taking a consistent snapshot of a distributed system is no trivial task for the reasons of asynchrony between the nodes in the system. As the state of each machine changes in response to incoming external messages or internal events, each node may produce a log of such state changes. With the log abstraction, the problem of taking a snapshot transforms into the issue of aligning the logs together and finding a consistent cut among all these logs. However, time asynchrony between the servers makes collating all the system logs difficult using just physical clocks available at each machine, because clocks tend to drift, producing some time asynchrony or time uncertainty. Various time synchronization protocols, such as NTP and PTP, exist, but perfect synchronization is still unattainable.

Retroscope is the system we designed to take unplanned, non-blocking, consistent global snapshots. Unlike other systems, Retroscope does not need to block while taking snapshot, as it does not need to wait out time uncertainty caused by the clock skews at various machines thanks to the reliance on Hybrid Logical Clocks (HLC) instead of NTP-synchronized (NTP) time.

Figure 1
Using NTP time fails to take consistent snapshot without waiting out the time uncertainty

 

HLC introduces causality information into the clock as messages are being exchanged between servers and provides the same causal guarantees as Lamport’s Logical Clocks (LC). More information about HLC can be found here and here.

Taking a snapshot

Retroscope achieves snapshots by adding HLC into the network communication of the Retroscoped system. Internally, Retroscope keeps a sliding window-log of past state changes along with the associated HLC timestamps at each node. This window-logs are used to facilitate the unplanned nature of taking snapshots. Snapshots are triggered by a special client that maintains the common HLC with the rest of the system. Retroscope allows for instant unplanned snapshot to be started by the initiator, such instant snapshots are guaranteed to capture the states at the time Tnow of snapshot request being issued by the initiator. Obviously, once the snapshot request message reaches the nodes, the time has advanced to Tr > Tnow.

Upon receiving the snapshot request message at Tr, each node starts taking a local snapshot. Since the system does not halt processing requests, depending on the implementation, we may arrive to a local snapshot at some time Tf >= Tr. Because our local snapshot is at the state that happened after the requested time, we need to modify it to arrive to the state at time Tnow. We use the window-log of state changes to undo all operations that happened locally after Tnow, thus arriving to a desired local snapshot. Once all nodes compute local snapshots, Retroscope is done taking a global consistent snapshot at time Tnow.

fig2
Instant Snapshot is being taken. (1) Initiator sends the message to take a snapshot at Tnow, (2) Process n receives the message and start taking a local snapshot (3) All operations performed after Tnow are undone from the local snapshot.

Retroscope provides more flexibility in taking unplanned snapshots. Taking instant snapshot (i.e. snapshot initiated at Tnow) requires each node to maintain only a small log of recent changes. We can, however, expand the instant snapshots and offer retrospective snapshot flexibility at the expense of growing state change log larger. With retrospective snapshots, we can offer the ability to look at the state that has already happened in the past. This functionality is handy for application debugging when there is a need to investigate the root cause of the problem after it has already happened. A distributed reset is another application that can benefit from the retrospective snapshots, as the system can be reset into a correct state after the state has been corrupted.

We have Retroscoped Voldemort key-value database to take data-snapshots. Retroscoping Voldemort took less than a 1000 lines of code for adding HLC to the network protocol, recording changes in the Retroscope window-log, and performing snapshot on Voldemort’s storage. We did the experiments on the 10-node Voldemort cluster with databases of various sizes. We have learned that keeping the window-log of state changes has very little impact on the throughput and latency when no snapshots are being taken, as seen in figure below.

Figure 3
Impact of Retroscope on normal system operation.

Performing snapshot is non-blocking in Retroscope, because there is no need to wait out the time uncertainty. The non-blocking nature allows Voldemort to continue processing both read and write requests while the snapshot is being computed. The figure below shows throughput and latency for every second of execution while taking the snapshot on a 10,000,000 items database (each item is 100 bytes). Overall we have observed an 18% throughput and 25% latency degradation over the snapshot time, however these numbers can be improved by using a separate disk system for snapshot.

Figure 4
Impact of Retroscope while taking a snapshot

What can be done with Retroscope?

We used Retroscope to take snapshots of data on a key-value store, however the utility of snapshots can be very extensive. With powerful snapshot capabilities, such as retrospective snapshot, we can look into the past of our distributed system and search for anomalies. Retrospective snapshots can be used to restore system to the latest correct state after the state corruption. Finding such correct states is also possible with Retroscope; we can use it to take successive snapshots to check for global invariants; it can be powerful in monitoring various application level predicates. Retroscope can perform other monitoring tasks. Unlike other monitoring systems that tend to look at local state independently or isolate monitoring to a request level, Retroscope can look at global parameters of the system across every node in a consistent way. We can even use Retroscope to detect erroneous patterns in message exchange by observing what messages are sent and received and how they impact the state at each node as we go through time.