Retroscoping Zookeeper Staleness

ZooKeeper is a popular coordination service used as part of many large scale distributed systems. ZooKeeper provides a file-system inspired abstraction to the users on top of its replicated key-value store. Like other Paxos-inspired protocols, ZooKeeper is typically deployed on at least 3 nodes, and can tolerate F node failure for a cluster of size 2F+1. One characteristic of ZooKeeper is that it runs the consensus algorithm for update operations, however to speed things up reads may be served locally by the replica a client connects to. This means that a value read from a replica may be stale or outdated compared to the leader nodes or even other followers. This makes it a user’s problem to tolerate the stale data read from ZooKeeper, or perform sync operation to get up-to-date with the leader before reading.

But how stale can data become in a ZooKeeper cluster? This is a rather tricky question to answer, since each replica runs on a separate machine with different clock. We cannot just observe the time a value become available at each node, because these timestamp are not comparable due to clock skew.  Instead of trying to figure out the staleness time, I decided to look at how many versions behind can a replica be. I used my Retroscope tool to keep track of when the data becomes available to the client at each replica. I used Retroscope Query Language (RQL) to collect the data from nodes and look at the consistent cuts progressing through the states of ZooKeeper. Retroscoping ZooKeeper took only about 30 lines of code to be added to the project.

f1
Initial run. 1 version staleness is expected.

I deployed a small ZooKeeper cluster on 3 AWS t2.micro instances (I know, it is far from production setup, but it works well for a quick test). On a separate instance I deployed RQL server. To start, I simply created one znode and updated its value. I then proceeded with running the simplest possible RQL query: SELECT retro FROM setd; in this query, retro corresponds to /retro znode I’ve created, and setd is the name of the Retroscope log I put my monitoring data into. The result of the query was exactly what I have expected: at one of the consistent cuts one of the nodes had a value one version behind. This is entirely normal behavior, as the value needs to propagate from the leader to the followers once decided.

My next move was to give a bit more work to the ZooKeeper in a short burst, so I quickly wrote a small program that puts some incremental values to n znodes for a total of r writes. It starts with a value tst1, then goes to tst2 and so on for every znode.  At first I restricted n=1, as I felt that writing to just one znode to create a “hot-spot” was going to give me the best chance of getting stale values. But ZooKeeper handled the burst of 100 writes easily, with the results being identical to a single write: stale value was at most 1 version behind the current data.

f2
Two versions behind.

Seeing things work well was not interesting for me, so I decided to no longer play fair. I artificially made one replica to work slower and be a struggler node. ZooKeeper protocol tolerates this just fine, as it needs 2 out of 3 nodes in my cluster to form the majority quorum and make progress. For this crippled ZooKeeper setup I re-ran the workload and my simple query. Needless to say, I was able to spot one time a system had a node with a znode 2 versions behind, and it was my struggler machine. In the next step I increased the load on the cluster and made the workload write 1000 values to the same znode. I also changed the query so that I do not have to manually look through thousands of consistent cuts trying to spot the stale data. My new query emits consistent cuts with staleness of 2 or more versions:

SELECT r1 FROM setd WHEN Int(StrReplace(r1, “tst”, “”)) – Int(StrReplace(r1, “tst”, “”)) > 1;

The interesting thing about the query above is that it uses the same variable name twice in the expression, essentially telling RQL to output a cut when r1 – r1 > 1. However, there are are many r1‘s in the system (3 in fact, one at each node), so when a pair of r1‘s that satisfy WHEN condition is found (at different nodes), RQL will output the consistent cut.

f3
Very stale.

I was a bit surprised by the results. At first the struggler managed to keep-up with the rest of the cluster quite well, slipping 2 version behind on occasion, but after about 200 requests things went out of control for the crippled node, with the staleness growing to be 158 version behind towards the end of the run. Of course a struggler node will make things look worse, but it is not an unrealistic scenario to have underperforming machines. My test however is not fair either, as I was using a 100% write workload targeting just one znode. So In the next try I changed the workload to target 100 different znodes, while still measuring the staleness on just one znode. In that experiment, the  staleness was not that high, but the number of updates to a single znode was only a small fraction of the previous test. Nevertheless, struggler replica was as much as 6 version stale, making it roughly 1/3rd of updates behind the rest of the cluster.

For the last quick test, I tried doing a large burst of 2000 writes to the same znode on a healthy cluster with no struggler nodes. Despite all replicas working at their proper speed, I was able to observe staleness of 3 versions on some occasions.

I am not sure about the lessons learned from this quick experiment. I was amazed by how easy it is to get ZooKeeper to have data 2 or more versions behind, although the system seems fast to catch up. Struggler scenario, however, illustrated how quickly things can get out of control with just some performance degradation at one of the replicas.  Engineers using ZooKeeper must build their applications in such a way to tolerate the stale ZooKeeper values gracefully.

Why Government IT is Expensive and Archaic

Disclaimer: I do not work for the government, and my rant below is based on my very limited exposure to how IT works at the US government setting.

Why Government IT is Expensive and Archaic? I think, this can be a very long discussion, but I do have a quick answer:  standards imposed by government and used by the government regulated industries. I have a very little experience with these types of standards, but they make me cringe every time I have to deal with them. Bellow I briefly describe my encounter with them.

When I was just a college student, I joined a (very) small IT company sitting next to the University campus. I started just as an intern during my 3rd year of college, and I was working fulltime a year later. At the same time, our team was tasked with making a piece of software for a private company to be used for tracking medical services provided to students at public schools throughout the state. This piece of software was to replace an older one, and we had a very strict set of requirements: “Make it work and look the same but better”.  Such requirements, along with many decisions a college-student-turned-software-engineer had to make, shaped how the system is working right now. Of course, it was not just me making the product, but nevertheless, my “brilliant” ideas slipped in and became what the system is today.

One part if the system is responsible for billing the medical services tracked in the system to Medicaid. And this is where the interactions with government IT has started. Government agency cannot just provide a secure API for the software developers like myself to use. No, it has to hire a major business to implement a standard commonly used for Medical transaction. And here we are, in the 21st having to adhere to the Electronic Data Interchange (EDI) X12, a standard designed in the 1970s to transfer medical (among other EDI uses) data between computer systems. Don’t get me wrong, standards are good, they make different systems work together flawlessly… But that is until you start looking at the standard. For the system, we had to implement only one transaction type at that moment, so what can be difficult about it? The difficulty started with a 700 page manual just for constructing the transaction request. The manual is accompanied by an errata and an errata of errata. In addition, there is a 100 page companion guide that specifies procedures specific for the state Medicaid.

So how easy is the standard itself? Maybe it is not that complicated to work with and the manuals are just full of fluff? Well, EDI is a textual format, so in theory a person can read the file to see all the data. Manuals even provide the example of how pieces of it should look like: SV1HC:99211:2512.25UN111✽✽1:2:3✽✽Y~

Easy enough to read? Nope, so you are back at the manual studying what everything means. For example, SV1 is the header for section describing the professional (medical) service provided, HC code describes what type of codes to follow next, 99211 is the service code followed by code modifiers and modifiers of modifiers and so on. Somewhere in there is how much to charge Medicaid and how many units of service have been provided. But on the bright side, it has cool delimiters: stars, colons and tildes. To top it off, each field can be flexible in size or can be restricted to some number of characters or to a certain set of values, and to find this you consult the manual once again.

What if you make an error? Not a problem, the response to the request comes back as an EDI file, equally cryptic, that describes what went wrong. And we are back to the manuals, counting stars and tildes to check if you send all the data and whether it was in the right format and right order.

It is also worth mentioning that EDI format is used for HIPAA protected medical data, but it has no security built in, everything is plaintext and with the help of manual anyone can read it.  The transmission of the requests, however, is carried out over a secure channel. In my case, I am sending batched requests for processing and the only way to transmit those is by using an SFTP. Needless to say it also becomes my responsibility to pull the response files from the SFTP server once requests are processed, and since there is no strict guarantees on when the processing is completed for each batch, I just do it periodically and eventually collect the responses. How the EDI files are stored on the system side is all up to the engineers designing such system, and as users we can only hope it has passed HIPAA compliance checks and secures the data at rest.

Obviously the archaic standards, like EDI, work well in practice. After all, despite my “brilliant” ideas implemented in other parts of the system, the EDI layer was relatively problem-free. But the standards definitely can be better and new standards, if developed, can provide improved security,  they can be easier to work with, and they will reduce the costs of developing new software. However, I suspect the costs of switching to new standards are just too high for the existing infrastructure, forcing the industry to create a bigger and bigger gap between what government agencies require and what is modern, efficient and secure.

I no longer work for the company, but I still maintain the software. And I was dreading the time a client asks for changes to how the system interacts with Medicaid. It seems like this time is upon me now, and in the summer I will be looking at more manuals for EDI X12 requests needed to implement new features.

The First Datastore-driven Vehicle

vold-carIt is not a secret that procrastination is the favorite activity of most PhD students. I have been procrastinating today, even though my advisor probably wants me to keep writing.  In the midst of my procrastination, I thought: “Why are there self-driving vehicles, but no database-driven vehicles?” As absurd as it sounds, I gave it a try. And now I not so proudly present the prototype of the world’s first(?) datastore-driven vehicle. The implementation was quick and simple. I took the Voldemort code I have been tinkering with in my Voldemort traffic light post, and changed it to send the signals over Bluetooth connection to a Frankenstein-of-a-car I’ve built very quickly out a few motors, a gearbox, an Arduino, a bunch of wires, some tracks, and lots of thermal glue.

The car changes the direction of travel when the workload characteristics change: it will go straight when number of read operations is the same as number of write operation.  By changing the read-to-write ratio of the workload, the car will turn either left or right. The prototype vehicle is not very responsive (weak motors/dead batteries) and sometimes runs into things despite my best attempts to control it.

And now, once the world has a database-driven car, the procrastination can continue… Or should I start writing?

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: “Slicer: Auto-Sharding for Datacenter Applications”

One of the questions engineers of large distributed system must answer is “where to compute”. This is a big and important question, as we do not want to send a request originating in the US to some server in Australia. It simply makes no sense to incur the communication overhead if there are resources available closer. But physical affinity is not the only requirement to answer the question. Just as we would not make an Olympic swimming competition in a desert simply because a desert is close, we would not send a Russian language speech recognition request to the servers with English language model. In other words, when deciding on the placement of a computation, we need to find the closest place with enough resources of the right type.

Slicer is a Google’s answer to this “where” problem. It continuously tries to find the best possible place to perform the computations in the presence of dynamically changing workload. Many applications within Google, such as speech recognition, various caches and DNS service use Slicer to partition and balance their workload among a set of tasks, or application servers (containers). Computation placement is not on the application critical path, and it is somewhat static: once it has been made, we can use it for some time until the conditions change. Think of finding a swimming pool for our Olympics competition ahead of the actual races and sticking with the same pool until the games are over.

Slicer sticks with the same pool of resources as well, as it directs similar requests to the same set of tasks. Each application using Slicer needs to produce a key associated with the request and Slicer hashes it into a 63-bit internal key, which is then used to decide on the placement of the computation. This makes it possible for requests with the same key to be processed by the same set of tasks, catering to locality of reference.

Clerk library provides a client interface to pull and learn which keys are assigned to which tasks. With Clerk, clients can learn the assignment and communicate directly with tasks. Google’s RPC service Stuby is also integrated with Slicer and can provide Clerk’s functionality, so many applications do not even need to use Clerk. Application servers use a similar library, called Slicelet, to learn which keys that particular task is expected to process, allowing tasks to adapt for the change in the requests they serve.

Slicer service is divided in two major parts: Assigners and Distributors. Assigners map keys to tasks, while Distributors retrieve these maps from Assigners and deliver them to the Clerks and Slicelets upon request. More than one Assigner exists globally, and any assigner can generate the key assignment for any application, however, in most cases only one assigner is considered to be preferred. Sometimes more than one Assigner can act as preferred for short duration, but Assigners eventually converge to the same assignment through the optimistically-consistent storage. Slicer also has a mode with strongly consistent assignment, however it has not been tested on production at the time of paper publication. Slicer also packs a variety of backup mechanisms to make sure clients can still make the requests despite Assigner or Distributor failures. In case of miss-assignment (~0.02% of requests), for example due to failed tasks, client will need to try the request again with the new assignment obtained from Slicer.

Assigners perform load balancing for the requests by changing how many tasks are assigned to each key. Balancing is done taking into account a usage rate of different keys. As the load changes, Slicer updates it’s mapping by taking some servers off the lesser used keys and putting them for the use by “heavy” keys. Load balancing results in lesser load on the “hottest” tasks of an application (median job’s hottest task is 63% less loaded), compared to static request distribution. At the same time, Slicer reacts to shifting workload patterns in the matter of minutes.

Slicer hottest task load.
Hottest task load under Static sharding and Slicer. Slicer gets 63% lower hottest task load for a median application.

Slicer seems to be playing a big role in deciding where to process requests at Google, at least for certain applications. It provides load balancing and ensures the requests get to servers or tasks that are both prepared to accept them and have capacity to do so.

Monitoring with Retroscope: Detecting Invariant Violations

Earlier I briefly mentioned Retroscope, our distributed snapshot library that makes taking non-blocking, unplanned consistent global distributed snapshots possible. However, these snapshots are only good if we know how to use them well. Of course the most obvious use case is just a data backup, and despite it being an important application for snapshots, I feel it being a bit boring to my taste. What I am thinking right now is using snapshots for distributed monitoring and debugging.

Let’s consider an application that has a global invariant predicate P, and we want to check if a distributed system holds the invariant P at all times. This means that we should never see a consistent cut in which predicate P = false. So our problem is boiling down to looking for consistent cuts that violate P. Luckily, Retroscope can do exactly this, since we can take one snapshots and incrementally move forward in time as the application execution progresses, checking the predicates by looking at consistent cuts as the state advances.

With the basic Retroscope described in the earlier post, finding predicate violations is a rather cumbersome effort that requires writing new code for every invariant a user wishes to check. So in the past few months I have been working on Retroscope extension tailored specifically for debugging and monitoring use cases.  Improved Retroscope exposes the Retroscope Query Language (RQL), a SQL-like interface to allow users write queries to search for conditions happening in the consistent cuts.

Now let’s go back to our hypothetical system with global invariant P and for now assume P holds when all local predicates p0, p1, p2, …, pn hold on the nodes [0 … n]. As such, P = p0  p1  p2  …  pn, and if any of the local predicates fail, the global predicate fails as well. For the simplicity of the example, we can say that local predicate pi is following: pi = ai + bi > ci. This makes each node maintain all three variables, although the nodes may have different values. With Retroscope, we can expose these local variables to be stored in the local log named inv. The log will maintain both the current version of the variables and the history of variable changes.

How do we look for the violation of such invariant with RQL? Just a single query would suffice for us:

SELECT inv.a, inv.b, inv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK SAME_NODE

Now we can dissect this query into bits and see what happens there. RQL queries are meant to retrieve consistent cuts that satisfy certain criteria. The list of parameters following the SELECT statement specifies what variables we want to see in the resultant consistent cuts. FROM keyword enumerate the logs we use in this particular query. The actual consistent cut criteria are specified after the WHEN keyword. In the particular case the condition for emitting cuts is (inv.a + inv.b <= inv.c) LINK SAME_NODE, which is equivalent to emitting cuts when the following holds:f1

By now a curios reader would have probably asked a question of why we even bother with consistent cut in this particular example. All predicates can be checked locally and their evaluation does not depend on other remote servers, so we can simply run local monitors and do not worry about consistent cuts and time synchronization at all: failure on one node designate the failure of the system globally no matter the time. Retroscope and RQL shines when we break away from this locality. What if our invariant involves messages being sent and received? Or what if in involves different parameters that exists on different machines at the same time? With the ability of looking at consistent cuts, RQL breaches the boundary of a single node. Below I list just a few variations of the original query that no longer deal with conjunction of local predicates and look at global state as a whole:

  1. SELECT inv.a, inv.b, anv.c FROM inv WHEN inv.a + inv.b <= inv.c
    • Omitting LINK SAME_NODE part changes the operation of the query drastically, as all three variables are no longer bound to co-exist on the same node:f2
    • rql_predicates1
  2. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK EACH_NODE
    • Replacing LINK SAME_NODE with LINK EACH_NODE, changes the search condition to require every node satisfying it in the consistent cut:f3
    • rql_predicates2
  3. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c
    • Rewriting the condition to WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c will cause the inv.a and inv.b to be summed on the same nodes, and compared to inv.c values from other nodes as well, so the consistent cut is emitted when f4
    • rql_predicates3
  4. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) AND NODE($1) = NODE($3)
    • This query restricts inv.a and inv.c to be on the same node. $1 is the placeholder for the first variable encountered while parsing left to right, and $3 is the third variable. This emits the consistent cuts when f5
    • rql_predicates4

Above are just a few simple examples of what is possible with RQL, however there are limitations. The biggest limitation is the complexity of the conditions. Even though RQL does not limit how many operations are possible in the condition of the query, having large expressions can slow the system down drastically. For example, a simple WHEN inv.a > inv.b will examine all a’s that exist on the nodes of the system at the consistent cut and all b’s in every possible combination. For f6 . Comparison is then carried out on every element of product set E.

P.S. I illustrated some of the syntax as it operates at the time of this writing, however RQL is developing, and I am not sure I like syntax of conditions too much, so it is a subject to change.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.

Is Java Fast Enough for Distributed Applications?

Lots of modern distributed systems are built with Java programming language, and consequently use Java Virtual Machine (JVM) as their execution environment. The list of such systems is rather large: Hadoop, Spark, HBase, Cassandra, Voldemort, ZooKeeper, BookKeeper, Kafka, and the list goes on and on. But is JVM fast enough for these systems?

Anyone who has ever dealt with Java probably knows at least a little bit about how JVM works. To start with, Java programs are compiled into a machine independent, un-optimized byte code. The byte code is then being interpreted by the JVM and compiled into the native code with the just-in-time (JIT) compiler. JVM adds various optimizations at the JIT compilation and these optimizations can be more aggressive than the optimizations done by a native compilers. After all, before doing these optimizations and compilation, Java has already ran the code in the interpreted mode, and it was able to collect some statistics on the branch predictions, loops and function calls to make optimization tailored not just to that specific code, but also to the specific runtime or data.

However, before Java performs all the tricks, it needs to run in a slow interpreted mode, incurring some warm-up overheads. OSDI’16 paper “Don’t Get Caught in the Cold, Warm-up Your JVM” goes into more details about what are the warm-up overheads and how they impact data-parallel distributed systems, such as Hadoop, Spark and Hive.

Warm-up Costs

The paper breaks down warm-up overheads into the two categories: class loading and bytecode interpretation overheads. It investigates these overheads under different workloads on different distributed systems. Of course it is expected for warm-up to impact the freshly started JVM, but how big is the cost of warm-up? If we look at the HDFS client performance, we can see the warm-up can easily take a few seconds, depending on your task. In HDFS, writing is more complicated and involves more classes, thus Java spends more time loading all the classes.  Warm-up while reading from HDFS also differs depending on whether we read in parallel or sequentially. The graph below shows warm-up costs by the task and dataset size.

Figure 1
JVM warm-up overheads. “cl” stands for class loading. “int” is bytecode interpretation.

We can see that the size of the operation has no impact on the overheads, meaning that small operations will spend much larger fraction of their time in warm-up, while big operations tend to amortize the warm-up costs.

figure2
Warm-up overhead as a fraction of total runtime.

It is also interesting to see when the warm-up occurs in the execution cycle. Obviously starting the client requires lots of class loading and interrupting the byte code, however starting actual jobs (for the first time?) also incurs warm-ups.

Figure 3
Warm-up at different execution stages.

Another question one may ask is how slow actual class loading is? For HDFS sequential read, client had to load about 2000 classes, taking 1028 ms to complete.  Spark was much heavier on the classes it uses and needs to load with 19,066 classes on average taking roughly 6.3 seconds in overheads.  These are rather large numbers, especially if we aim at low-latency execution of our requests, however not everything is so grim.

It is important to emphasize that the paper mainly uses clients to study the warm-up, while the actual distributed system is not being studied in much of the details. To be fair, authors mention that the warm-up overheads are present on the server side as well, and in Spark the executor warm-up can add up to almost 50% of the overall warm-up time.

figure4
Spark warm-up overheads.

Dealing with Warm-up

The paper argue that these are very big overheads that must be dealt with. Authors even offer a prototype solution, a modified JVM, called HotTub, which acts as a container for many other “normal” JVMs to be reused when needed. Reusing JVM means we do not need to load classes and perform JIT. Such approach works well for short lived JVMs, i.e we have a client performing one operation and terminating. If such terminated JVM ends up in the pool for JVM reuse, we can save time on overheads next time we need another short-lived JVM.

I have to disagree, however, that these overheads are a big problem, and here is why. JVM running server side of the distributed system are warmed-up if they ran for at least some time. As such, these machines do not experience warm-up costs anymore. In this breakdown of the HDFS request, we do not see any warm-up losses occurring on the data-node side and all of the overheads were due to the warm-up of a short-lived HDFS client. This means that keeping you JVM alive and designing your workloads/client to stay up is the best solution to overcome these type of overheads.

Figure 5
HDFS warm-up.

There are few lessons I have learned from this paper. They may sound like a common sense, but nevertheless these are important points to keep in the back of your head to get the most out of your Java code.

  • Keep JVMs alive. Long running JVMs do not incur as many new class loads and do not need to interpret as much code, allowing the JVM to be faster.
  • Simpler is better. Too Many classes hurt performance on the warm-up, however do not go to extreme on the other side too. After all these are warm-up costs and not constant penalty to your performance.
  • Watch your external libraries. This goes together with previous point. Bringing a big library to perform one small task may not be too wise if similar-performing alternatives are available.

Globally Consistent Distributed Snapshots with Retroscope

Taking a consistent snapshot of a distributed system is no trivial task for the reasons of asynchrony between the nodes in the system. As the state of each machine changes in response to incoming external messages or internal events, each node may produce a log of such state changes. With the log abstraction, the problem of taking a snapshot transforms into the issue of aligning the logs together and finding a consistent cut among all these logs. However, time asynchrony between the servers makes collating all the system logs difficult using just physical clocks available at each machine, because clocks tend to drift, producing some time asynchrony or time uncertainty. Various time synchronization protocols, such as NTP and PTP, exist, but perfect synchronization is still unattainable.

Retroscope is the system we designed to take unplanned, non-blocking, consistent global snapshots. Unlike other systems, Retroscope does not need to block while taking snapshot, as it does not need to wait out time uncertainty caused by the clock skews at various machines thanks to the reliance on Hybrid Logical Clocks (HLC) instead of NTP-synchronized (NTP) time.

Figure 1
Using NTP time fails to take consistent snapshot without waiting out the time uncertainty

 

HLC introduces causality information into the clock as messages are being exchanged between servers and provides the same causal guarantees as Lamport’s Logical Clocks (LC). More information about HLC can be found here and here.

Taking a snapshot

Retroscope achieves snapshots by adding HLC into the network communication of the Retroscoped system. Internally, Retroscope keeps a sliding window-log of past state changes along with the associated HLC timestamps at each node. This window-logs are used to facilitate the unplanned nature of taking snapshots. Snapshots are triggered by a special client that maintains the common HLC with the rest of the system. Retroscope allows for instant unplanned snapshot to be started by the initiator, such instant snapshots are guaranteed to capture the states at the time Tnow of snapshot request being issued by the initiator. Obviously, once the snapshot request message reaches the nodes, the time has advanced to Tr > Tnow.

Upon receiving the snapshot request message at Tr, each node starts taking a local snapshot. Since the system does not halt processing requests, depending on the implementation, we may arrive to a local snapshot at some time Tf >= Tr. Because our local snapshot is at the state that happened after the requested time, we need to modify it to arrive to the state at time Tnow. We use the window-log of state changes to undo all operations that happened locally after Tnow, thus arriving to a desired local snapshot. Once all nodes compute local snapshots, Retroscope is done taking a global consistent snapshot at time Tnow.

fig2
Instant Snapshot is being taken. (1) Initiator sends the message to take a snapshot at Tnow, (2) Process n receives the message and start taking a local snapshot (3) All operations performed after Tnow are undone from the local snapshot.

Retroscope provides more flexibility in taking unplanned snapshots. Taking instant snapshot (i.e. snapshot initiated at Tnow) requires each node to maintain only a small log of recent changes. We can, however, expand the instant snapshots and offer retrospective snapshot flexibility at the expense of growing state change log larger. With retrospective snapshots, we can offer the ability to look at the state that has already happened in the past. This functionality is handy for application debugging when there is a need to investigate the root cause of the problem after it has already happened. A distributed reset is another application that can benefit from the retrospective snapshots, as the system can be reset into a correct state after the state has been corrupted.

We have Retroscoped Voldemort key-value database to take data-snapshots. Retroscoping Voldemort took less than a 1000 lines of code for adding HLC to the network protocol, recording changes in the Retroscope window-log, and performing snapshot on Voldemort’s storage. We did the experiments on the 10-node Voldemort cluster with databases of various sizes. We have learned that keeping the window-log of state changes has very little impact on the throughput and latency when no snapshots are being taken, as seen in figure below.

Figure 3
Impact of Retroscope on normal system operation.

Performing snapshot is non-blocking in Retroscope, because there is no need to wait out the time uncertainty. The non-blocking nature allows Voldemort to continue processing both read and write requests while the snapshot is being computed. The figure below shows throughput and latency for every second of execution while taking the snapshot on a 10,000,000 items database (each item is 100 bytes). Overall we have observed an 18% throughput and 25% latency degradation over the snapshot time, however these numbers can be improved by using a separate disk system for snapshot.

Figure 4
Impact of Retroscope while taking a snapshot

What can be done with Retroscope?

We used Retroscope to take snapshots of data on a key-value store, however the utility of snapshots can be very extensive. With powerful snapshot capabilities, such as retrospective snapshot, we can look into the past of our distributed system and search for anomalies. Retrospective snapshots can be used to restore system to the latest correct state after the state corruption. Finding such correct states is also possible with Retroscope; we can use it to take successive snapshots to check for global invariants; it can be powerful in monitoring various application level predicates. Retroscope can perform other monitoring tasks. Unlike other monitoring systems that tend to look at local state independently or isolate monitoring to a request level, Retroscope can look at global parameters of the system across every node in a consistent way. We can even use Retroscope to detect erroneous patterns in message exchange by observing what messages are sent and received and how they impact the state at each node as we go through time.

Gorilla – Facebook’s Cache for Time Series Data

Facebook operates a huge infrastructure that needs to be constantly monitored for performance and stability. Such monitoring collects huge amounts of data that must be easily accessible to various diagnosis and anomaly detection tools in order to quickly identify and react to possible issues. Many of such parameters can be represented as real-valued time series. For example, server CPU utilization can be thought of as one of such time series: it can be sampled at some time interval and represented as a numeric value. In order to accommodate all the time series data for various parameter produced by all the server, Facebook needs a scalable, robust and fast way to store and manage time series.

Gorilla: A Fast, Scalable, In-Memory Time Series Database paper describes Facebook’s approach to the problem of managing large amounts of time series. After reading the first page of this paper, I started to ask myself whether Gorilla is truly a time series database or if it is a monitoring data cache for Facebook. To understand what is Gorilla, we must look at what data Gorilla stores and how it was designed and implemented.

Facebook’s monitoring tasks set a strict set of requirements a time series database needs to meet, which I briefly summarize in the list below:

  • Store real-valued monitoring data
  • Have fast access to 26 hours of monitoring data
  • Be scalable on Facebook scale, as the amount of data increases all the time
  • Maintain millions of time series
  • Fast retrieval of time series with reads in under 1 ms
  • Support up to 40,000 queries per second
  • Low granularity time series with resolution of up to 4 data point per second.
  • Replicated design for disaster recovery.

Gorilla Data Model

Gorilla is said to store time series data in which every data point consists of a timestamp and a single 64-bit value. This places the limitation on what kind of time series can be stored. For one, timestamp requirement makes it more difficult to deal with ordinal time series in which only the order of events matters and not the duration between them (sure, we can assign always increasing integers for timestamp to represent the order). Another limitation is inability to store multi-dimensional time series, where a single data point consists of a vector rather than a single value.

Physical time of the event is important for Facebook’s usage case of Gorilla, as the engineers need to know the time when an event or anomaly happened. Facebook engineers also do not care about recording vectors of data for each point in time, as they can record multiple values into different time series, which allows them to improve memory utilization of the system at the expense of versatility. This makes Gorilla very specialized tool from the standpoint of data it can handle, as the data-model is dictated by the overall requirement for monitoring task on Facebook scale.

Gorilla Design and Implementation

Obviously, Gorilla was designed and built to meet the requirements outlined earlier. Similar to how data model was derived to achieve the requirements, the rest of the system also sacrifices the generalizability for the sake of meeting the monitoring goals. Below I try to look at various portions of the system and see how universal or flexible the design of Gorilla is.

Compressed In-Memory Storage

Requiring the time series retrieval in under 1 ms means that data needs to reside in memory of the machines composing the Gorilla deployment. This time requirement most likely also imposes the limitation of only 4 point/second, as the system needs to keep individual the time series small enough to be able to retrieve them in entirety quickly enough. The 26 hours of data is needed for monitoring tasks at Facebook, meaning that a single time series will not exceed the size of 6240 points.

Despite the small size of individual time series, Facebook generates millions of such time series, so the memory consumption of the entire cluster is very high. Some very clever algorithms are used to compact each time series individually. Gorilla compresses both the timestamps and values for each data point.

The timestamp compression uses a delta-of-delta technique. The compression starts by finding a difference Δtn-1,n between the new timestamp tn and a previous timestamp tn-1: Δtn-1,n = tn – tn-1. This difference can already be represented with less bits then the original timestamp. However, the time series used in the monitoring at Facebook tend to happen at regular intervals, allowing even greater compression by finding the difference D between delta’s instead of differences between events. In other words, system does not code the changes in timestamps of the events as it would do with a single delta, but it uses the differences between the intervals between the events: D = Δtn, n-1 – Δtn-1, n-2. If everything operates correctly, most events happen at regular, constant intervals, resulting in 0 difference between them. This 0 difference can be encoded with just one bit of ‘0’. However, when some irregularity happens, the difference between intervals can still be represented with just a few bits. Computing the delta-of-deltas is shown in the example below:

time_compression
Delta-of-deltas compression for timestamps.

The binary representation for D also plays a crucial role in compression and must be tweaked based on the application and the frequency of point in the time series. In Facebook’s monitoring case if delta-of-deltas D is between [-63, 64], it is encoded with ‘10’ followed by 7 bits of difference value for a total size of 9 bits; if D is between [-255, 256], value D is prepended with ‘110’ bits, resulting in a total size of 12 bits, this patterns continues to cover larger D values.

Data compression is achieved through comparing two values and finding a substring of bits that changed from one point to the next one. Since the system looks for only one such substring, it can encode the offset if such substring from the beginning of the 64-bit value, and the actual substring containing the changes, thus eliminating the need to store the prefix and suffix to the changed substring, as these bits can be taken from previous value.

This compression scheme favors the scenarios in which the value stays constant from one point to another, as constant value can be represented with just one bit. This is extremely useful for Facebook monitoring in which many of the values stay constant or show small changes that can be efficiently compressed. Combined with time stamp compression, Gorilla shows remarkable reduction in the average size of a data point:

compression_efficiency
Compression ration as a function of compression window size (picture from Facebook’s paper).

This performance is however not universal and will not scale well to other time series outside of the Facebook monitoring use case. In both timestamp and value compression, in order to read a data point, system needs to read its predecessor, this requires the compression to run in non-overlapping windows and reset itself after some time interval. Having windows too large will require more compute resources to read the data point.

The timestamp compression struggles from a number of shortcoming. It operates on a second level resolution which allows to ignore small millisecond level variations and still encode the points with such variations as having no difference in the intervals. This works well for when we can record a maximum of 4 points in one minute, however many application, such as music, audio, or sensors produce data at much faster rate, requiring a more precise timestamp resolution on a millisecond or even nanosecond scale and not on a second scale.  Requiring more precision from timestamps will undermining the benefits that can be achieved with timestamp compression. Additionally, if the intervals between the time series are not constant or nearly constant and tend to change frequently, the efficiency of compression will also dramatically degrade.

The value compression cannot handle vector time series without making it more complicated and requiring constant size vectors. It also works best when the values do not change by much. Great changes on values may lead to no compression between two points at all.

Storage Model

Gorilla is a sharded system that assigns each server to handle a subset of the time series currently stored in the system. Since two shards share no data in common, growing the system to accommodate more time series is as easy as simply adding more servers and updating shard mapping to make these servers available for new time series. Gorilla tolerate machine failures by writing the data to a replicated network storage, although it does not attempt to make storage and memory consistent and may lose the information buffered for writing upon a node failure. Gorilla can handle more disastrous events as well, since it was designed to tolerate an entire region failure by streaming every data point to two different data centers. Similar to disk persistence, the system does not try to ensure consistency between the two regions. Such possible inconsistencies may lead to data loss, and dealing with incomplete data is left to the client.

Query Model

Gorilla query model appears to be very simplistic, as it simply allows to retrieve time series, given the time series unique name and the time range. The rest of the processing is left to the client systems. Such retrieval approach is very fast, as it simply requires locating the server responsible for the time series, uncompressing it and returning to the client. Such approach is most likely tuned for specific monitoring needs at Facebook and it provides very good latency as show below:

query_latency
Query latency (picture from Facebook’s paper).

In my master’s thesis I have explored more complicated querying patterns for time series. In particular, one common query pattern is similarity search across many time series or across different parts of the same time series. Many approaches exist to answer the k Nearest Neighbors (k-NN) types of queries that search for k most similar fragments to the input query. Some of these approaches, such as Dynamic Time Warping are very difficult to index and are not suitable for database application, but there are methods that can be used for indexing and database application. I adapted and modified R*-tree index for being stored in the HBase database along with the actual time series data, and as such my system prototype was able to perform k-NN queries (with Euclidean distance similarity) on a disk backed system. Of course there were a number of limitations, such bad scalability when searching for large patterns without the use of dimensionality reduction and overall low latency of searches due to relying on HBase for index. However, in-memory approaches can have fast indexes, and can provide more sophisticated querying patterns to answer the similarity or anomaly detection type of queries. Finding similar sub sequences with Gorilla will require fetching the time series we are interested and exhaustively searching for patterns in them.

There are many motivations for k-NN search in time series, ranging from medical to engineering to entertainment. For example, a system records person’s ECG data and performs a search on new patterns it receives. If it finds a similar pattern to some known cases that lead to heart failure, the system can notify the doctor before the problem can develop further. And of course there are other types of interesting queries that can be done on time series, as such I strongly believe that Gorilla’s query model may be inadequate for some uses.

A Few Concluding Words

Facebook’s Gorilla is a fascinating piece of engineering, it achieves very good compression of time series data, high scalability and fast retrieval time. However, I am not sure one can call it a time series database, as all of its achievements result from making the system more and more specialized for a specific application – monitoring server/system parameters at Facebook. It high compression is the result of low time resolution and low update data update frequency in the time series used for monitoring. It’s query model is designed for fast retrieval, but it is very simplistic and offloads the time series processing, pattern matching and anomaly detection to the client applications. Gorilla’s data is also very specialized: it is single-dimensional, with infrequent updates, small changes to the value from one point to another, and constant update rate. This data is very specialized in nature and can hardly be thought of as a good representation for all other time series data.  All of the limitations and compromises made to achieve Facebook’s requirements for a specific use case make me think that Gorilla is not a TSDB by any means. It is rather just a cache for infrequently changing monitoring data.