Tag Archives: monitoring

Trace Synchronization with HLC

Event logging or tracing is one of the most common techniques for collecting data about the software execution. For simple application running on the same machine, a trace of events timestamped with the machine’s hardware clock is typically sufficient. When the system grows and becomes distributed over multiple nodes, each node is going to produce its own independent logs or traces. Unfortunately, nodes running on different physical machines do not have access to a single global master clock to timestamp and order the events, making these individual logs unaligned in time.

For some purposes, such as debugging, engineers often need to look at these independent logs as one whole. For instance, if Alice the engineer needs need to see how some event on one node influenced the rest of the system, she will need to examine all logs and pay attention to the causal relationships between events. The time skew of different nodes and the trace misalignment caused by such skew prevents Alice from safely relying on such independently produced traces, as shown in the figure below. Instead, she need to collate the logs together and reduce the misalignment as much as possible to produce a more coherent picture of a distributed system execution.

Misaligned logs with PT
Physical time makes logs misaligned and miss the causality between events.

Of course if only the causality was important, Alice could have used Lamport’s logical clocks (LC) to help her identify some causal relationship between the events on different nodes. Alternatively, logging system can also use vector clocks (VC) to capture all of the causal relationships in the system. However, both LC and VC are disjoint from the physical time of the events in the trace, making it hard for Alice to navigate such logging system.

Using synchronized time protocols, such as NTP and PTP  will help make the traces better aligned. These protocols are not perfect and still leave out some synchronization error or uncertainty, introducing the possibility of breaking the causality when collating logs purely with them.

HLC sync

Instead of using NTP or logical time for synchronizing the event logs, I thought whether it is possible to use both at the same time with the help of Hybrid Logical Time (HLC). HLC combines a physical time, such as NTP with logical clock to keep track of causality during the period of synchronization uncertainty. Since HLC acts as a single always-increasing time in the entire distributed system, it can be used to timestamp the events in the log traces of every node. You can learn more about HLC here.

Similar to logical time, HLC does not capture full causality between the events. However, HLC conforms to the LC implication: if event A happened before event B, then HLC timestamp of A is smaller than the HLC timestamp of B. This can be written as A hb B ⇒ hlc.A < hlc.B. Obviously, we cannot use HLC timestamps to order any two events. Despite this limitation, we can still use the LC implication to give some partial information about the order of events. If an event A has an HLC timestamp greater than the event B, we can at least say that event B did not happen before A, thus either A happened before B or A is concurrent with B: hlc.A < hlc.B ⇒ A hb B ∨ A co B.

We can use this property to adjust the synchronization between the traces produced at different nodes. Let’s assume we have two nodes with some clock skew. These nodes produce logs that are not fully synchronized in time (we also assume the knowledge of a global, “ideal” time for now). The events in the log happen instantaneously, however we can rely on the machine’s clock to measure the time between events on the same node to give the “rigidity” to the logs. Each node timestamps the log events with its machine’s physical time (PT).

Unaligned logs
Logs aligned based on PT time. Ideal global time is shown in red.

In the figure above, the two logs are not synchronized in the “ideal” time, even though they appear to be in sync based on the PT of each node. Without any additional information we cannot improve on the synchrony of these logs. However, if we replace PT time with HLC, we can achieve better trace synchronization.

unaligned logs with HLC
HLC can be used instead of PT to synchronize the traces

With the addition of HLC time, we may see that when the logs are aligned by the PT only, some HLC timestamps (highlighted in yellow) appear to be out of place. In particular, this alignment does not satisfy the LC condition (A hb B ⇒ hlc.A < hlc.B), since the alignment makes event a1 to appear to happen before c2, however, hlc.c2 > hlc.a1. In order to satisfy the condition, we need to re-sync the logs such that c2 appear concurrent with a1.

HLC is used to improve log synchronization.
HLC is used to improve log synchronization.

After the adjustment, our synchronization error between two nodes is reduced. Note that we cannot synchronize the logs better and put a1 to happen before c2, since the LC implication simple does not allow us to do so.

The two node synchronization works nice and easy because the LC/HLC implication provides some guarantees about the two events, and we pick these two events from two separate nodes.  Aligning more than two logs is more challenging, as we need to perform many comparisons each involving just two events from some two nodes. The number of possible comparison we need to make grows drastically as we increase the number of traces to sync.

However, with HLC we can reduce the problem to just that of performing n-1 2-node log alignments when we need to sync logs from n nodes. HLC operates by learning of higher timestamps from the communication, thus HLC time at all nodes of the cluster tends to follow the PT time of one node with the highest PT clock.  Once, again to see this you need to understand how HLC ticks, which is explained here. Having one node that drives the entire HLC in the cluster allows us to synchronize every other log from each node independently with the log of that HLC “driver node”.

Some Testing and Limitation

I set up some synthetic tests to see if HLC can help us achieve any improvement in log synchronization on already loosely synchronized system. The benchmark generates the logs with a number of parameters I can controlled: time skew, communication latency and event probabilities. Maximum time-skew controls the time desynchronization in the simulated cluster measured in time ticks. There will be two nodes in the simulation with maximum time-skew difference between their clocks. Communication latency parameters control the latency of simulated communications in time ticks. Probability of an event parameter controls the chance of an event happening at every time tick; similarly, the probability of communication determines the chance of an outgoing message happening at the time tick.

Since the logs are generated synthetically, I have access to the ideal time synchronization between these logs, allowing me to quantify the alignment error. I calculated the alignment error as 1 – adjustment / skew for every pair of logs.

The figure below shows the error in synchronizing 5 logs as a function of the log length. The Idea is that a longer trace can allow for more opportunities to find the violations in LC/HLC condition and adjust the logs accordingly. All other parameters were kept constant with skew of 50 ticks, communication delay of 3 to 10 ticks, a chance of event as 40% per tick. I made different plots for different probability of inter-node communication. I repeated the simulation 100,000 times and computed the average error.

results 1
Log alignment error as a function of log length

We can see that in this setup the synchronization between logs was significantly improved, and the improvement happens faster when communication if more frequent. This is because communication introduces the causality between nodes, allowing me to exploit it for synchronization. As the logs grow larger the improvements diminish. This is likely due to the reducing closer to the communication delay, at which point HLC synchronization can no longer make improvements.

Inability to achieve any synchronization improvement when the communication latency is equal or greater than the maximum time skew in the system is the most significant limitation of HLC synchronization. The following graph illustrates that:

HLC synchronization limitations

Here I ran the simulation with 3 different skew levels: 10, 20 and 50 ticks. As I increase the communication latency, the error is growing as well, getting to the level of no improvement as the latency reaches the time-skew.

Some Concluding Words

I think this is a rather naïve and simple way to achieve better synchronization for distributed logging software. One can argue that NTP and PTP time synchronization methods are rather good nowadays and that it should be enough for most cases. However, computers are fast now and even in 1 ms of desynchronization many computations can be made. Even a few full round trips of network message exchange can happen in that 1 ms in good Local Area Networks.

HLC synchronization’s simplicity allows it to be implemented purely in user-level application code. There is no need to constantly run NTP protocol to keep tight time synchrony, there is no need to access or modify any of the underlying system-level code beyond just reading the clock, there is no need to access high precision clock that may slow the application down.

HLC can also be used within a single machine to synchronize on traces from different threads. Despite having access to the same clock at all threads and processes, the clock granularity may still be coarse enough to do many computations within a single tick.

HLC sync is not without the limits. Its usefulness degrades as the synchronization gets closer to the communication latency, but it can still be used as a fail-safe mechanism in cases NTP fails.

Alternatively to synchronizing logs, HLC can be used to find consistent global states and search through the distributed application’s past execution.

One Page Summary: “milliScope: a Fine-Grained Monitoring Framework for Performance Debugging of n-Tier Web Services”

Authors of the ICDCS2017 milliScope paper attack an interesting monitoring problem for distributed systems: detecting and determining a cause of short-lived events in the system. In particular, they address the issue of identifying very short bottlenecks (VSBs) in distributed web services. VSBs manifest themselves as performance degradation of a small number of requests, however they are intermittent, short-lived and hard to detect with tools that aggregate or sample performance data.

milliscope1MilliScope is a monitoring solution aimed exactly at detecting such short bottlenecks and finding their root-cause. For the detection part, milliScope relies on the Event mScopeMonitors. These monitors are present at every component of the web service and record the timestamps associated with each request entering and leaving a particular component. In total, 4 local timestamps are recorded per component, 2 on the forward pass of the request and 2 on the return. This classical request tracing approach captures causal information about request propagation through the components, and it is sufficient to pinpoint the exact request and the exact component experiencing a slow-down.

milliscope2In order to provide more information about the cause of the bottleneck, milliScope uses Resource mScopeMonitors. Resource monitors use existing performance monitoring tools to log the metrics, such as CPU utilization, memory usage, I/O usage, etc. MilliScope then transforms the performance logs to a common, structured format, adding some extra support information. The data from both the event and resource mScopeMonitors eventually ends up in a warehouse and can be queried by the users. Users can overlay the data from different monitors onto each other, allowing to better identify what component or components were causing the bottleneck and what was the root cause behind the slowdown. An example of such overlaying is shown in the figure on the left, where the queue lengths of different components are shown on the same graph.

Some Critical Questions

The paper claims that existing monitors have too much overhead and thus cannot capture all requests and find the bottlenecks, however event mScopeMonitor is a simple request tracer that is no different from many others, so it is not clear why it should perform better.

MilliScope collects lots of performance monitoring data in different logs and then brings the data to a centralized location for storage and processing. Authors never mentioned in the paper how they address clock uncertainty between all these different logs. Do they require clock sync, such as NTP? Even though the event monitors capture causal relationship within the same request, resource monitors seem to rely on physical time. The authors claim that milliScope can detect and help explain bottlenecks as short as 10 ms, but what will happen if the resource monitors are skewed more than 10 ms? How accurate is milliScope going to be? In fact, both case studies in the paper worked with the bottlenecks of hundreds of milliseconds or more.

And finally, what can we do once we detected a very short bottleneck? Since VSBs are so short-lived, there may not be any time to react to their presence. Maybe a next step would be to look for precursors to the bottlenecks, so we can rebalance the system or prepare it in some other way for an incoming performance hiccup?

Sonification of Distributed Systems with RQL

In the past, I have discussed sonification as a mean of representing monitoring data. Aside from some silly and toy examples, sonifications can be used for serious applications. In many monitoring cases, the presence of some phenomena is more important than the details about it. In such situations, simple sonification is a perfect way to alert users about the occurrence of such phenomena.  For example, Geiger counter alerts users of the radioactive decay, without providing any further details.

Our recent work on Retroscope and RQL allows us to bring sonification to the same “phenomena-awareness” plane for distributed systems. With RQL we can search for interesting conditions happening globally in the system, and we can use sounds to alert the engineers when certain predicates occur in the global state. Of course, for such system to be useful, it needs to be real-time, as the recency of events is the most important attribute of this type of sonification. For instance, users of a Geiger counter are not interested in hearing ticks for decay events happening a minute ago, as it rather defeats the purpose of the counter. Similar requirements apply to the “phenomena-awareness” tools in the distributed systems domain, as engineers need to know what happens in the system instantaneously.

RQL can easily allow inspecting past states, however it lacks when it comes to the current state inspection.  What we need is a streaming query that can continuously receive individual logs from Retroscope log-servers, and perform the search as soon as sufficient data become available to form a consistent cut. Of course, streaming queries will still be lagging behind the current time, however we can make this lag small enough to be virtually unobservable by a person.

With streaming queries we can not only receive the cuts meeting some search criteria in near real time, but we can also sonify the mere fact such cuts have been found. If we look at the previous example of ZooKeeper staleness, we can run sonified streaming queries to have the system alert us when the data staleness reaches some threshold, such as two or more versions stale.  In the audio clip below, we have used MIDI to sonify the stream of staleness events occurring in a ZooKeeper cluster. Multiple queries were sonified to produce different sounds for different staleness of ZooKeeper data.

We can hear periods of silence when the staleness is below the threshold of 2 values, however we can also observe some variations in cluster performance. For instance, it is very easy to identify when the cluster was experiencing more problems than normal. It is also easy to hear it recovering from the spike in staleness soon after.

Retroscoping Zookeeper Staleness

ZooKeeper is a popular coordination service used as part of many large scale distributed systems. ZooKeeper provides a file-system inspired abstraction to the users on top of its replicated key-value store. Like other Paxos-inspired protocols, ZooKeeper is typically deployed on at least 3 nodes, and can tolerate F node failure for a cluster of size 2F+1. One characteristic of ZooKeeper is that it runs the consensus algorithm for update operations, however to speed things up reads may be served locally by the replica a client connects to. This means that a value read from a replica may be stale or outdated compared to the leader nodes or even other followers. This makes it a user’s problem to tolerate the stale data read from ZooKeeper, or perform sync operation to get up-to-date with the leader before reading.

But how stale can data become in a ZooKeeper cluster? This is a rather tricky question to answer, since each replica runs on a separate machine with different clock. We cannot just observe the time a value become available at each node, because these timestamp are not comparable due to clock skew.  Instead of trying to figure out the staleness time, I decided to look at how many versions behind can a replica be. I used my Retroscope tool to keep track of when the data becomes available to the client at each replica. I used Retroscope Query Language (RQL) to collect the data from nodes and look at the consistent cuts progressing through the states of ZooKeeper. Retroscoping ZooKeeper took only about 30 lines of code to be added to the project.

Initial run. 1 version staleness is expected.

I deployed a small ZooKeeper cluster on 3 AWS t2.micro instances (I know, it is far from production setup, but it works well for a quick test). On a separate instance I deployed RQL server. To start, I simply created one znode and updated its value. I then proceeded with running the simplest possible RQL query: SELECT retro FROM setd; in this query, retro corresponds to /retro znode I’ve created, and setd is the name of the Retroscope log I put my monitoring data into. The result of the query was exactly what I have expected: at one of the consistent cuts one of the nodes had a value one version behind. This is entirely normal behavior, as the value needs to propagate from the leader to the followers once decided.

My next move was to give a bit more work to the ZooKeeper in a short burst, so I quickly wrote a small program that puts some incremental values to n znodes for a total of r writes. It starts with a value tst1, then goes to tst2 and so on for every znode.  At first I restricted n=1, as I felt that writing to just one znode to create a “hot-spot” was going to give me the best chance of getting stale values. But ZooKeeper handled the burst of 100 writes easily, with the results being identical to a single write: stale value was at most 1 version behind the current data.

Two versions behind.

Seeing things work well was not interesting for me, so I decided to no longer play fair. I artificially made one replica to work slower and be a struggler node. ZooKeeper protocol tolerates this just fine, as it needs 2 out of 3 nodes in my cluster to form the majority quorum and make progress. For this crippled ZooKeeper setup I re-ran the workload and my simple query. Needless to say, I was able to spot one time a system had a node with a znode 2 versions behind, and it was my struggler machine. In the next step I increased the load on the cluster and made the workload write 1000 values to the same znode. I also changed the query so that I do not have to manually look through thousands of consistent cuts trying to spot the stale data. My new query emits consistent cuts with staleness of 2 or more versions:

SELECT r1 FROM setd WHEN Int(StrReplace(r1, “tst”, “”)) – Int(StrReplace(r1, “tst”, “”)) > 1;

The interesting thing about the query above is that it uses the same variable name twice in the expression, essentially telling RQL to output a cut when r1 – r1 > 1. However, there are are many r1‘s in the system (3 in fact, one at each node), so when a pair of r1‘s that satisfy WHEN condition is found (at different nodes), RQL will output the consistent cut.

Very stale.

I was a bit surprised by the results. At first the struggler managed to keep-up with the rest of the cluster quite well, slipping 2 version behind on occasion, but after about 200 requests things went out of control for the crippled node, with the staleness growing to be 158 version behind towards the end of the run. Of course a struggler node will make things look worse, but it is not an unrealistic scenario to have underperforming machines. My test however is not fair either, as I was using a 100% write workload targeting just one znode. So In the next try I changed the workload to target 100 different znodes, while still measuring the staleness on just one znode. In that experiment, the  staleness was not that high, but the number of updates to a single znode was only a small fraction of the previous test. Nevertheless, struggler replica was as much as 6 version stale, making it roughly 1/3rd of updates behind the rest of the cluster.

For the last quick test, I tried doing a large burst of 2000 writes to the same znode on a healthy cluster with no struggler nodes. Despite all replicas working at their proper speed, I was able to observe staleness of 3 versions on some occasions.

I am not sure about the lessons learned from this quick experiment. I was amazed by how easy it is to get ZooKeeper to have data 2 or more versions behind, although the system seems fast to catch up. Struggler scenario, however, illustrated how quickly things can get out of control with just some performance degradation at one of the replicas.  Engineers using ZooKeeper must build their applications in such a way to tolerate the stale ZooKeeper values gracefully.

Monitoring with Retroscope: Detecting Invariant Violations

Earlier I briefly mentioned Retroscope, our distributed snapshot library that makes taking non-blocking, unplanned consistent global distributed snapshots possible. However, these snapshots are only good if we know how to use them well. Of course the most obvious use case is just a data backup, and despite it being an important application for snapshots, I feel it being a bit boring to my taste. What I am thinking right now is using snapshots for distributed monitoring and debugging.

Let’s consider an application that has a global invariant predicate P, and we want to check if a distributed system holds the invariant P at all times. This means that we should never see a consistent cut in which predicate P = false. So our problem is boiling down to looking for consistent cuts that violate P. Luckily, Retroscope can do exactly this, since we can take one snapshots and incrementally move forward in time as the application execution progresses, checking the predicates by looking at consistent cuts as the state advances.

With the basic Retroscope described in the earlier post, finding predicate violations is a rather cumbersome effort that requires writing new code for every invariant a user wishes to check. So in the past few months I have been working on Retroscope extension tailored specifically for debugging and monitoring use cases.  Improved Retroscope exposes the Retroscope Query Language (RQL), a SQL-like interface to allow users write queries to search for conditions happening in the consistent cuts.

Now let’s go back to our hypothetical system with global invariant P and for now assume P holds when all local predicates p0, p1, p2, …, pn hold on the nodes [0 … n]. As such, P = p0  p1  p2  …  pn, and if any of the local predicates fail, the global predicate fails as well. For the simplicity of the example, we can say that local predicate pi is following: pi = ai + bi > ci. This makes each node maintain all three variables, although the nodes may have different values. With Retroscope, we can expose these local variables to be stored in the local log named inv. The log will maintain both the current version of the variables and the history of variable changes.

How do we look for the violation of such invariant with RQL? Just a single query would suffice for us:

SELECT inv.a, inv.b, inv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK SAME_NODE

Now we can dissect this query into bits and see what happens there. RQL queries are meant to retrieve consistent cuts that satisfy certain criteria. The list of parameters following the SELECT statement specifies what variables we want to see in the resultant consistent cuts. FROM keyword enumerate the logs we use in this particular query. The actual consistent cut criteria are specified after the WHEN keyword. In the particular case the condition for emitting cuts is (inv.a + inv.b <= inv.c) LINK SAME_NODE, which is equivalent to emitting cuts when the following holds:f1

By now a curios reader would have probably asked a question of why we even bother with consistent cut in this particular example. All predicates can be checked locally and their evaluation does not depend on other remote servers, so we can simply run local monitors and do not worry about consistent cuts and time synchronization at all: failure on one node designate the failure of the system globally no matter the time. Retroscope and RQL shines when we break away from this locality. What if our invariant involves messages being sent and received? Or what if in involves different parameters that exists on different machines at the same time? With the ability of looking at consistent cuts, RQL breaches the boundary of a single node. Below I list just a few variations of the original query that no longer deal with conjunction of local predicates and look at global state as a whole:

  1. SELECT inv.a, inv.b, anv.c FROM inv WHEN inv.a + inv.b <= inv.c
    • Omitting LINK SAME_NODE part changes the operation of the query drastically, as all three variables are no longer bound to co-exist on the same node:f2
    • rql_predicates1
  2. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK EACH_NODE
    • Replacing LINK SAME_NODE with LINK EACH_NODE, changes the search condition to require every node satisfying it in the consistent cut:f3
    • rql_predicates2
  3. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c
    • Rewriting the condition to WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c will cause the inv.a and inv.b to be summed on the same nodes, and compared to inv.c values from other nodes as well, so the consistent cut is emitted when f4
    • rql_predicates3
  4. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) AND NODE($1) = NODE($3)
    • This query restricts inv.a and inv.c to be on the same node. $1 is the placeholder for the first variable encountered while parsing left to right, and $3 is the third variable. This emits the consistent cuts when f5
    • rql_predicates4

Above are just a few simple examples of what is possible with RQL, however there are limitations. The biggest limitation is the complexity of the conditions. Even though RQL does not limit how many operations are possible in the condition of the query, having large expressions can slow the system down drastically. For example, a simple WHEN inv.a > inv.b will examine all a’s that exist on the nodes of the system at the consistent cut and all b’s in every possible combination. For f6 . Comparison is then carried out on every element of product set E.

P.S. I illustrated some of the syntax as it operates at the time of this writing, however RQL is developing, and I am not sure I like syntax of conditions too much, so it is a subject to change.

Globally Consistent Distributed Snapshots with Retroscope

Taking a consistent snapshot of a distributed system is no trivial task for the reasons of asynchrony between the nodes in the system. As the state of each machine changes in response to incoming external messages or internal events, each node may produce a log of such state changes. With the log abstraction, the problem of taking a snapshot transforms into the issue of aligning the logs together and finding a consistent cut among all these logs. However, time asynchrony between the servers makes collating all the system logs difficult using just physical clocks available at each machine, because clocks tend to drift, producing some time asynchrony or time uncertainty. Various time synchronization protocols, such as NTP and PTP, exist, but perfect synchronization is still unattainable.

Retroscope is the system we designed to take unplanned, non-blocking, consistent global snapshots. Unlike other systems, Retroscope does not need to block while taking snapshot, as it does not need to wait out time uncertainty caused by the clock skews at various machines thanks to the reliance on Hybrid Logical Clocks (HLC) instead of NTP-synchronized (NTP) time.

Figure 1
Using NTP time fails to take consistent snapshot without waiting out the time uncertainty


HLC introduces causality information into the clock as messages are being exchanged between servers and provides the same causal guarantees as Lamport’s Logical Clocks (LC). More information about HLC can be found here and here.

Taking a snapshot

Retroscope achieves snapshots by adding HLC into the network communication of the Retroscoped system. Internally, Retroscope keeps a sliding window-log of past state changes along with the associated HLC timestamps at each node. This window-logs are used to facilitate the unplanned nature of taking snapshots. Snapshots are triggered by a special client that maintains the common HLC with the rest of the system. Retroscope allows for instant unplanned snapshot to be started by the initiator, such instant snapshots are guaranteed to capture the states at the time Tnow of snapshot request being issued by the initiator. Obviously, once the snapshot request message reaches the nodes, the time has advanced to Tr > Tnow.

Upon receiving the snapshot request message at Tr, each node starts taking a local snapshot. Since the system does not halt processing requests, depending on the implementation, we may arrive to a local snapshot at some time Tf >= Tr. Because our local snapshot is at the state that happened after the requested time, we need to modify it to arrive to the state at time Tnow. We use the window-log of state changes to undo all operations that happened locally after Tnow, thus arriving to a desired local snapshot. Once all nodes compute local snapshots, Retroscope is done taking a global consistent snapshot at time Tnow.

Instant Snapshot is being taken. (1) Initiator sends the message to take a snapshot at Tnow, (2) Process n receives the message and start taking a local snapshot (3) All operations performed after Tnow are undone from the local snapshot.

Retroscope provides more flexibility in taking unplanned snapshots. Taking instant snapshot (i.e. snapshot initiated at Tnow) requires each node to maintain only a small log of recent changes. We can, however, expand the instant snapshots and offer retrospective snapshot flexibility at the expense of growing state change log larger. With retrospective snapshots, we can offer the ability to look at the state that has already happened in the past. This functionality is handy for application debugging when there is a need to investigate the root cause of the problem after it has already happened. A distributed reset is another application that can benefit from the retrospective snapshots, as the system can be reset into a correct state after the state has been corrupted.

We have Retroscoped Voldemort key-value database to take data-snapshots. Retroscoping Voldemort took less than a 1000 lines of code for adding HLC to the network protocol, recording changes in the Retroscope window-log, and performing snapshot on Voldemort’s storage. We did the experiments on the 10-node Voldemort cluster with databases of various sizes. We have learned that keeping the window-log of state changes has very little impact on the throughput and latency when no snapshots are being taken, as seen in figure below.

Figure 3
Impact of Retroscope on normal system operation.

Performing snapshot is non-blocking in Retroscope, because there is no need to wait out the time uncertainty. The non-blocking nature allows Voldemort to continue processing both read and write requests while the snapshot is being computed. The figure below shows throughput and latency for every second of execution while taking the snapshot on a 10,000,000 items database (each item is 100 bytes). Overall we have observed an 18% throughput and 25% latency degradation over the snapshot time, however these numbers can be improved by using a separate disk system for snapshot.

Figure 4
Impact of Retroscope while taking a snapshot

What can be done with Retroscope?

We used Retroscope to take snapshots of data on a key-value store, however the utility of snapshots can be very extensive. With powerful snapshot capabilities, such as retrospective snapshot, we can look into the past of our distributed system and search for anomalies. Retrospective snapshots can be used to restore system to the latest correct state after the state corruption. Finding such correct states is also possible with Retroscope; we can use it to take successive snapshots to check for global invariants; it can be powerful in monitoring various application level predicates. Retroscope can perform other monitoring tasks. Unlike other monitoring systems that tend to look at local state independently or isolate monitoring to a request level, Retroscope can look at global parameters of the system across every node in a consistent way. We can even use Retroscope to detect erroneous patterns in message exchange by observing what messages are sent and received and how they impact the state at each node as we go through time.

Gorilla – Facebook’s Cache for Time Series Data

Facebook operates a huge infrastructure that needs to be constantly monitored for performance and stability. Such monitoring collects huge amounts of data that must be easily accessible to various diagnosis and anomaly detection tools in order to quickly identify and react to possible issues. Many of such parameters can be represented as real-valued time series. For example, server CPU utilization can be thought of as one of such time series: it can be sampled at some time interval and represented as a numeric value. In order to accommodate all the time series data for various parameter produced by all the server, Facebook needs a scalable, robust and fast way to store and manage time series.

Gorilla: A Fast, Scalable, In-Memory Time Series Database paper describes Facebook’s approach to the problem of managing large amounts of time series. After reading the first page of this paper, I started to ask myself whether Gorilla is truly a time series database or if it is a monitoring data cache for Facebook. To understand what is Gorilla, we must look at what data Gorilla stores and how it was designed and implemented.

Facebook’s monitoring tasks set a strict set of requirements a time series database needs to meet, which I briefly summarize in the list below:

  • Store real-valued monitoring data
  • Have fast access to 26 hours of monitoring data
  • Be scalable on Facebook scale, as the amount of data increases all the time
  • Maintain millions of time series
  • Fast retrieval of time series with reads in under 1 ms
  • Support up to 40,000 queries per second
  • Low granularity time series with resolution of up to 4 data point per second.
  • Replicated design for disaster recovery.

Gorilla Data Model

Gorilla is said to store time series data in which every data point consists of a timestamp and a single 64-bit value. This places the limitation on what kind of time series can be stored. For one, timestamp requirement makes it more difficult to deal with ordinal time series in which only the order of events matters and not the duration between them (sure, we can assign always increasing integers for timestamp to represent the order). Another limitation is inability to store multi-dimensional time series, where a single data point consists of a vector rather than a single value.

Physical time of the event is important for Facebook’s usage case of Gorilla, as the engineers need to know the time when an event or anomaly happened. Facebook engineers also do not care about recording vectors of data for each point in time, as they can record multiple values into different time series, which allows them to improve memory utilization of the system at the expense of versatility. This makes Gorilla very specialized tool from the standpoint of data it can handle, as the data-model is dictated by the overall requirement for monitoring task on Facebook scale.

Gorilla Design and Implementation

Obviously, Gorilla was designed and built to meet the requirements outlined earlier. Similar to how data model was derived to achieve the requirements, the rest of the system also sacrifices the generalizability for the sake of meeting the monitoring goals. Below I try to look at various portions of the system and see how universal or flexible the design of Gorilla is.

Compressed In-Memory Storage

Requiring the time series retrieval in under 1 ms means that data needs to reside in memory of the machines composing the Gorilla deployment. This time requirement most likely also imposes the limitation of only 4 point/second, as the system needs to keep individual the time series small enough to be able to retrieve them in entirety quickly enough. The 26 hours of data is needed for monitoring tasks at Facebook, meaning that a single time series will not exceed the size of 6240 points.

Despite the small size of individual time series, Facebook generates millions of such time series, so the memory consumption of the entire cluster is very high. Some very clever algorithms are used to compact each time series individually. Gorilla compresses both the timestamps and values for each data point.

The timestamp compression uses a delta-of-delta technique. The compression starts by finding a difference Δtn-1,n between the new timestamp tn and a previous timestamp tn-1: Δtn-1,n = tn – tn-1. This difference can already be represented with less bits then the original timestamp. However, the time series used in the monitoring at Facebook tend to happen at regular intervals, allowing even greater compression by finding the difference D between delta’s instead of differences between events. In other words, system does not code the changes in timestamps of the events as it would do with a single delta, but it uses the differences between the intervals between the events: D = Δtn, n-1 – Δtn-1, n-2. If everything operates correctly, most events happen at regular, constant intervals, resulting in 0 difference between them. This 0 difference can be encoded with just one bit of ‘0’. However, when some irregularity happens, the difference between intervals can still be represented with just a few bits. Computing the delta-of-deltas is shown in the example below:

Delta-of-deltas compression for timestamps.

The binary representation for D also plays a crucial role in compression and must be tweaked based on the application and the frequency of point in the time series. In Facebook’s monitoring case if delta-of-deltas D is between [-63, 64], it is encoded with ‘10’ followed by 7 bits of difference value for a total size of 9 bits; if D is between [-255, 256], value D is prepended with ‘110’ bits, resulting in a total size of 12 bits, this patterns continues to cover larger D values.

Data compression is achieved through comparing two values and finding a substring of bits that changed from one point to the next one. Since the system looks for only one such substring, it can encode the offset if such substring from the beginning of the 64-bit value, and the actual substring containing the changes, thus eliminating the need to store the prefix and suffix to the changed substring, as these bits can be taken from previous value.

This compression scheme favors the scenarios in which the value stays constant from one point to another, as constant value can be represented with just one bit. This is extremely useful for Facebook monitoring in which many of the values stay constant or show small changes that can be efficiently compressed. Combined with time stamp compression, Gorilla shows remarkable reduction in the average size of a data point:

Compression ration as a function of compression window size (picture from Facebook’s paper).

This performance is however not universal and will not scale well to other time series outside of the Facebook monitoring use case. In both timestamp and value compression, in order to read a data point, system needs to read its predecessor, this requires the compression to run in non-overlapping windows and reset itself after some time interval. Having windows too large will require more compute resources to read the data point.

The timestamp compression struggles from a number of shortcoming. It operates on a second level resolution which allows to ignore small millisecond level variations and still encode the points with such variations as having no difference in the intervals. This works well for when we can record a maximum of 4 points in one minute, however many application, such as music, audio, or sensors produce data at much faster rate, requiring a more precise timestamp resolution on a millisecond or even nanosecond scale and not on a second scale.  Requiring more precision from timestamps will undermining the benefits that can be achieved with timestamp compression. Additionally, if the intervals between the time series are not constant or nearly constant and tend to change frequently, the efficiency of compression will also dramatically degrade.

The value compression cannot handle vector time series without making it more complicated and requiring constant size vectors. It also works best when the values do not change by much. Great changes on values may lead to no compression between two points at all.

Storage Model

Gorilla is a sharded system that assigns each server to handle a subset of the time series currently stored in the system. Since two shards share no data in common, growing the system to accommodate more time series is as easy as simply adding more servers and updating shard mapping to make these servers available for new time series. Gorilla tolerate machine failures by writing the data to a replicated network storage, although it does not attempt to make storage and memory consistent and may lose the information buffered for writing upon a node failure. Gorilla can handle more disastrous events as well, since it was designed to tolerate an entire region failure by streaming every data point to two different data centers. Similar to disk persistence, the system does not try to ensure consistency between the two regions. Such possible inconsistencies may lead to data loss, and dealing with incomplete data is left to the client.

Query Model

Gorilla query model appears to be very simplistic, as it simply allows to retrieve time series, given the time series unique name and the time range. The rest of the processing is left to the client systems. Such retrieval approach is very fast, as it simply requires locating the server responsible for the time series, uncompressing it and returning to the client. Such approach is most likely tuned for specific monitoring needs at Facebook and it provides very good latency as show below:

Query latency (picture from Facebook’s paper).

In my master’s thesis I have explored more complicated querying patterns for time series. In particular, one common query pattern is similarity search across many time series or across different parts of the same time series. Many approaches exist to answer the k Nearest Neighbors (k-NN) types of queries that search for k most similar fragments to the input query. Some of these approaches, such as Dynamic Time Warping are very difficult to index and are not suitable for database application, but there are methods that can be used for indexing and database application. I adapted and modified R*-tree index for being stored in the HBase database along with the actual time series data, and as such my system prototype was able to perform k-NN queries (with Euclidean distance similarity) on a disk backed system. Of course there were a number of limitations, such bad scalability when searching for large patterns without the use of dimensionality reduction and overall low latency of searches due to relying on HBase for index. However, in-memory approaches can have fast indexes, and can provide more sophisticated querying patterns to answer the similarity or anomaly detection type of queries. Finding similar sub sequences with Gorilla will require fetching the time series we are interested and exhaustively searching for patterns in them.

There are many motivations for k-NN search in time series, ranging from medical to engineering to entertainment. For example, a system records person’s ECG data and performs a search on new patterns it receives. If it finds a similar pattern to some known cases that lead to heart failure, the system can notify the doctor before the problem can develop further. And of course there are other types of interesting queries that can be done on time series, as such I strongly believe that Gorilla’s query model may be inadequate for some uses.

A Few Concluding Words

Facebook’s Gorilla is a fascinating piece of engineering, it achieves very good compression of time series data, high scalability and fast retrieval time. However, I am not sure one can call it a time series database, as all of its achievements result from making the system more and more specialized for a specific application – monitoring server/system parameters at Facebook. It high compression is the result of low time resolution and low update data update frequency in the time series used for monitoring. It’s query model is designed for fast retrieval, but it is very simplistic and offloads the time series processing, pattern matching and anomaly detection to the client applications. Gorilla’s data is also very specialized: it is single-dimensional, with infrequent updates, small changes to the value from one point to another, and constant update rate. This data is very specialized in nature and can hardly be thought of as a good representation for all other time series data.  All of the limitations and compromises made to achieve Facebook’s requirements for a specific use case make me think that Gorilla is not a TSDB by any means. It is rather just a cache for infrequently changing monitoring data.

The Light of Voldemort

Few month ago I showcased how a single server of Voldemort key-value store sounds.  Sonification is a valid way to monitor systems, and has been used a lot in real applications. Geiger counter would be one of the most well-known examples of a sonified application. In some cases sonification may be the preferred form of representing information, as other forms surprisingly may not work as well for human perception. Even visualization of information is often not as good as sonification. Take the same Geiger counter example; research has shown that visual radiation level monitors do not perform quite as well as the sound ones as people tend to be distracted from the display to perform other tasks. Even visual and audio hybrids do not alert users of high radiation levels as good as simple audio counter.

As an example of a hybrid audio-visual system for Voldemort, I have built a small “traffic-light monitor” that changes colors and beeps differently depending on what action is performed by the server. The rig is built with Arduino and simply plugs to the USB of the machine running Voldemort server. Below is a short video of how it operates:

The green light lights up when server handles a “get” operation. Yellow light is for “put” requests and red light is for “get version” commands. As can be see, writing to Voldemort requires two operations, “get version” and “put” and they happen so quickly that Arduino is barely capable to light up the LEDs.

P.S. “Traffic light monitor” is not to be taken seriously, it is rather a silly example to show that there are plenty of ways to monitor a system or represent system’s logs.

Pivot Tracing Part 2

After looking more at Pivot Tracing tool described in my earlier post, I asked myself about the limitations of such monitoring approach. Pivot tracing is not a universal tool, so it appears that there are few problems it does not address well enough.

The basic idea of the Pivot Tracing is to collect the information about the request as the request propagates through the system. The image below shows a partial illustration of request propagation along with information collection at pivot points.

Figure 1. Request flow

As the request passes through a pivot point in system A, we can collect some parameters, xA and yA, and use the baggage mechanism to send these parameters further along the request. Once the request reaches next pivot point, say in system B, we can also collect some information zB on that system. Note that system B does not have access to xA and yA directly without the Pivot Tracing tool, but thanks to the baggage mechanism, we have these parameters available at pivot point in system B.

Why is it important? It is fairly boring when we look at only one request trace, however when we look at all the requests happening in the system over some time interval things start to get a lot more exciting.

Figure 2. Multiple requests

We can aggregate the data over all the requests to have a system-wide statistics reported to the system user, and have parameters from one system propagate to another system with the baggage allowing much more complex requests aggregations. Having xA and yA at the system B enables us to aggregate on parameters in B, such as zB over different values of xA or yA. For example, I can now compute the sum of zB for all requests going through system B for different values of xA. This would not have been possible without having information propagate with the baggage from one pivot point to another. Another scenario would be aggregating variable z across both systems B and C for different values of parameters coming from A.

Aggregation of requests is extremely important, as it enables the system-wide monitoring of the distributed application, instead of looking at individual request traces. However, is this aggregation correct? Does it have errors? And what can cause the errors? Looking back at Figure 2, we see many requests executing in parallel, these requests are causally independent, so how does the system know these requests indeed happened between T0 and T2? Time skew between servers can impact the accuracy of reporting, especially if some requests run on disjointed set of servers (they do not share any common servers). Is this a big problem for Pivot Tracing? I think in most cases it is ok, as long as time skew is kept within the reasonable bounds. If the monitoring is run continuously over some period of time, missing some requests in one window will only make them counted in the other time-window.

Pivot Tracing is not capable to answer all kinds of queries. With the example above, we were aggregating the requests over some time period, but what if we want to know something about the system at exact instance? What if user desires to learn something about the system at time T1 (Figure 2). This is not possible with Pivot Tracing tool. For one, we cannot even be sure that T1 is the same time at all the requests due to the time skew. Second, even if we can guarantee exact time synchronization, there is no guarantee that all requests will be at the correct Pivot Point to collect such information at T1. In other words, Pivot Tracing cannot provide a user with consistent global information about the system at any exact point of time.

Instantaneous information may be useful for debugging and monitoring systems. For example, recently I had a need to find out how many nodes perform BDB JE log compaction in my Voldemort cluster at the same time. The compaction is not triggered directly by the requests, instead a separate local thread periodically checks if compaction is needed. As such, with a Pivot Tracing style tool, it would have been impossible for me to even instrument the Pivot Points as no request actually goes and triggers the compaction. In addition, the time skew would not have allowed me to know for sure whether the compaction was running at all nodes concurrently or it simply appears so from the misaligned time. What I need is a consistent global snapshot of parameters in my Voldemort cluster…

Review – Pivot Tracing: Dynamic Causal Monitoring for Distributed Systems

Debugging can be a nightmare for software engineers, it is even more so in the distributed systems that span many machines in potentially more than one datacenter. Unfortunately, many of the debugging and monitoring techniques for such large system do not differ much from the methods used to debug and monitor simple single-machine software. Logs are still one of the most common way to gain the insights into the operation of the software, and these logs are typically produced my each machine independently, making it next to impossible to find causal relationships between evens happening on different servers. In addition, logs must be installed in advance at development time, and altering the information collected after the system deployment can be problematic and will require additional developer time.

Pivot Tracing tries to address these issues. Pivot Tracing allows to dynamically alter what information is being collected without having to stop the system being monitored. It also introduces a happened-before join operation that allows engineers to correlate events based on their happened-before relations to each other. Despite the ability to dynamically reconfigure the system to collect different information, it still requires expert knowledge of distributed environment being monitored. Before a system can be used, engineers need to define tracepoints, or places in the code of the underlying system where monitoring and logging instrumentation can be dynamically installed by Pivot Tracing.  Engineers also need to define (1) what parameters can be extracted and logged by the system, however defining the parameters system can collect is not limited to pre-launch or configuration stage, these log parameters can be modified at any time during the life-cycle of the system Pivot Tracing monitors.


Pivot Tracing users use a high-level query language to request monitoring/debugging information they need. The query is compiled into an intermediate representation called advice.  Different parameters can be collected at various tracepoints, so the advice carries the instructions to each relevant tracepoint regarding what instrumentation or monitoring needs to be installed at each tracepoint and what information is to be collected and propagated in the system.  The data is collected with the execution path flow of the system, as execution passes through a relevant tracepoint (4) some parameters are collected and send down the execution path in a baggage (5). In addition tracepoints (4) can emit tuples (6) that are being sent to Pivot Tracing front end (7) will become available to the user (8).

A happened-before join, denoted by “->”, is a very powerful tool within Pivot Tracing for capturing causality of events. Let’s look at the following query example:


This query sets the anticipated execution path of the request. At first, a request needs to pass through a ClientProtocol and followed by the tracepoint at incrBytesRead. In the example above, we are only interested in the events that go from ClientProtocols to incrBytesRead, and any other execution paths will not work for this query. Since the query runs in parts along the execution path of the request, Pivot Tracing can easily capture happened-before relationship between events by following the request within the system. Advice compiled from the query has capabilities for evaluating messages coming in the baggage from prior tracepoints to process the happened-before joins. If the tracepoint appears earlier in the execution path, then the events at that tracepoint will happen before the events at the later tracepoint.

But what appoint non-linear execution paths? What if we have segments of code that execute in parallel? The paper does not talk about this in great details, but Pivot Tracing should still work well.  For instance, if two threads are parallel and do not communicate with each other, then the events in this two threads are concurrent, however once the two threads start to communicate, the baggage from earlier tracepoints will be transmitted along these communication channels, allowing Pivot Tracing to carry out happened-before joins as usual. Consider the following example authors provide:


Query A -> B produces a1b2 and a2b2 results, however there is no a1b1, because at the time b1 was running, it had no baggage from the thread running a1, so both a1 and b1 are concurrent.

Pivot Tracing has been implemented in Java and tested against Hadoop software stack. Authors claim to have found a bug in HDFS by using Pivot Tracing, however by the time a bug was found with Pivot Tracing, it has already been reported by others. Nevertheless, it is impressive that the system was able to help find the problem by just executing a few small queries.

The overhead of pivot tracing is fairly small, and mainly consist of packing, unpacking and transmitting tuples from one tracepoint to another.  As such, when no monitoring is required, the system can be left enabled with no queries running resulting in negligible overhead (PivotTracing Enabled row in the table below).


Under a stress test on the HDFS stack, the overhead reached almost 16% for certain operations. It is important to understand that some queries may result in bigger baggage transmitted and more tuples packed and unpacked to the baggage, thus, I think, it is be a good idea to test and optimize queries in staging environment before running it on the production system. This however defeats one of the bigger advantages of Pivot Tracing – its ability to dynamically adjust to different monitoring scenarios.

Authors do not talk much about scalability to system with a larger number of nodes or systems with various level of communication between different nodes. It is also interesting to see how big of a penalty will a WAN deployment incur? After all, the main overhead of the Pivot Tracing is baggage propagation and having to piggyback all that additional data to messages along the communication paths between the nodes can have severe negative effect for systems that are capable to saturate their bandwidth limitations.

Despite Pivot Tracing authors advocating against the traditional logs for debugging, their system is still fundamentally a logging system, albeit a lot more sophisticated. Users can use Pivot Tracing to log only the information they need along with some causal relationship between these log pieces. Despite this, I believe there are still cases when a traditional logging approach can be of more use than Pivot Tracing, namely debugging rare and subtle bugs that can happen only under certain set of conditions. With Pivot Tracing users can install instrumentation after such rare bug has occurred, but there is no guarantee that it will happen again anytime soon, yet the overall system pays the penalty overheads of the monitoring. In this context, traditional logs can provide more immediate benefit, as they allow engineers to look back in time at the system execution.

With the presence of back-in-time snapshot capability, we can revert back to the past states of the system and replay back the changes along with newly installed instrumentation for monitoring, but overheads of this may be enormous for a large scale distributed system. Is there a way around this? Can we look back in time and identify the bugs, data corruption or performance issues without paying a significant performance price?