Playing Around

Modeling Paxos Performance in Wide Area – Part 3

Earlier I looked at modeling paxos performance in local networks, however nowadays people (companies) use paxos and its flavors in the wide area as well. Take Google Spanner and CockroachDB as an example. I was naturally curious to expand my performance model into wide area networks as well. Since our lab worked on WAN coordination for quite some time, I knew what to expect from it, but nevertheless I got a few small surprises along the way.

In this post I will look at Paxos over WAN, EPaxos and our wPaxos protocols. I am going to skip most of the explanation of how I arrived to the models, since the models I used are very similar in spirit to the one I created for looking at local area performance. They all rely on queuing theory approximations for processing overheads and k-order statistics for impact of quorum size.

Despite being similar in methods used, modeling protocols designed for WAN operation proved to be more difficult than local area models. This difficulty arises mainly from the myriad of additional parameters I need to account for. For instance, for Paxos in WAN I need to look at latencies between each node in the cluster, since the WAN-networks are not really uniform in inter-region latencies. Going up to EPaxos, I have multiple leaders to model, which means I also must take into consideration the processing overheads each node takes in its role of following other nodes for some slots. wPaxos takes this even further: to model its performance I need to consider access locality and “object stealing” among other things.

Today I will focus only on 5 region models. In particular, I obtained average latencies between 5 AWS regions: Japan (JP), California (CA), Oregon (OR), Virginia (VA) and Ireland (IR). I show these regions and the latencies between them in Figure 1 below.

Some AWS regions and latencies in ms between them.
Figure 1: Some AWS regions and latencies in ms between them.

Paxos in WAN

Converting paxos model from LAN to WAN is rather straightforward; all I need to do is to modify my paxos model to take non-uniform distances between nodes. I also need the ability to set which node is going to be the leader for my multi-paxos rounds. With these small changes, I can play around with paxos and see how WAN affects it.

Paxos in WAN with leader in different regions.
Figure 2: Paxos in WAN with leader in different regions.

Figure 2 (above) shows a model run for 5 nodes in 5 regions (1 node per region). From my previous post, I knew that the maximum throughput of the system does not depend on network latency, so it is reasonable for paxos in WAN to be similar to paxos in local networks in this regard. However, I was a bit surprised to see how flat the latency stays in WAN deployment almost all the way till reaching the saturation point. This makes perfect sense, however, since the WAN RTT dominates the latency and small latency increases due to the queuing costs are largely masked by large network latency. This also may explain why Spanner, CockroachDB and others use paxos in databases; having predictable performance throughout the entire range of load conditions makes it desirable for delivering stable performance to clients and easier for load-balancing efforts.

However, not everything is so peachy here. Geographical placement of the leader node plays a crucial role in determining the latency of the paxos cluster. If the leader node is too far from the majority quorum nodes, it will incur high latency penalty. We see this with Japan and Ireland regions, as they appear far from all other nodes in the system and result in very high operation latency.

EPaxos

EPaxos protocol tries to address a few shortcoming in paxos. In particular, EPaxos no longer has a single leader node and any node can lead some commands. If commands are independent, then EPaxos can commit them quickly in one phase using a fast quorum. However, if the command have dependencies, EPaxos needs to run another phase on a majority quorum (at which point it pretty much becomes Paxos with two phases for leader election and operation commit). The fast quorum in some cases may be larger than the majority quorum, but in the 5-node model I describe today, the fast quorum is the same as the majority quorum (3 nodes).

Naturally, conflict between commands will impact the performance greatly: with no conflict, all operations can be decided in one phase, while with 100% conflict, all operations need two phases. Since running two phases requires more messages, I had to change the model to factor in the probability of running two phases. Additionally, the model now looks at the performance of every node separately, and account for the node leading some slots and following on the other.

EPaxos throughput at every node with 2% command conflict.
Figure 3: EPaxos throughput at every node with 2% command conflict.
EPaxos throughput at every node with 50% command conflict.
Figure 4: EPaxos throughput at every node with 50% command conflict.

Figures 3 and 4 (above) show EPaxos performance at every node for 2% and 50% conflict. Note that the aggregate throughput of the cluster is a sum of all 5 nodes. For 2% conflict, the max throughput was 2.7 times larger than that of Paxos. As the conflict between commands increases, EPaxos loses its capacity and its maximum throughput decreases, as I illustrate in Figure 5 (below). This changing capacity may make more difficult to use EPaxos in production environments. After all, workload characteristics may fluctuate throughout the system’s lifespan and EPaxos cluster may or may not withstand the workloads of identical intensity (same number of requests/sec), but different conflict.

EPaxos aggregate throughput for different conflict levels.
Figure 5: EPaxos aggregate maximum throughput for different conflict levels.

wPaxos

wPaxos is our recent flavor of WAN-optimized paxos. Its main premise is to separate the commands for different entities (objects) to different leaders and process these commands geographically close to where the entities are required by clients. Unlike most Paxos flavors, wPaxos needs large cluster, however, thanks to flexoble quorums, each operations only uses a subset of nodes in the cluster. This allows us to achieve both multi-leader capability and low average latency.

wPaxos, however, has lots of configurable parameters that all affect the performance. For instance, the fault tolerance may be reduced to the point where a system does not tolerate a region failure, but can still tolerate failure of nodes within the region. In this scenario (Figure 6, below), wPaxos can achieve the best performance with aggregate throughput across all regions (and 3 nodes per region for a total of 15 nodes) of 153,000 requests per second.

wPaxos with no region fault tolerance and 70% region-local operations and 1% chance of object migration.
Figure 6: wPaxos with no region fault tolerance and 70% region-local operations and 1% chance of object migration.

We still observe big differences in latencies due to the geography, as some requests originating in one regions must go through stealing phase or be resolved in another region. However, the average latency for a request is smaller than that of EPaxos or Paxos. Of course, a direct comparison between wPaxos and EPaxos is difficult, as wPaxos (at least in this model configuration) is not as fault tolerant as EPaxos. Also unlike my FPaxos model from last time, wPaxos model also reduces the communication in phase-2 to a phase-2 quorum only. This allows it to take much bigger advantage of flexible quorums than “talk-to-all-nodes” approach. As a result, having more nodes helps wPaxos provide higher throughput than EPaxos.
Some EPaxos problems still show-up in wPaxos. For instance, as the access locality decreases and rate of object migration grows, the maximum throughput a cluster can provide decreases. For instance, Figure 7 (below) shows wPaxos model with locality shrunken to 50% and object migration expanded to 3% of all requests.

wPaxos with no region fault tolerance and 50% region-local operations and 3% chance of object migration.
Figure 7: wPaxos with no region fault tolerance and 50% region-local operations and 3% chance of object migration.

How Good Are the Models?

I was striving to achieve the best model accuracy without going overboard with trying to account all possible variables in the model. The models both for LAN and WAN seems to agree fairly well with the results we observe in our Paxi framework for studying various flavors of consensus.

However, there is always room for improvement, as more parameters can be accounted for to make more accurate models. For instance, WAN RTTs do not really follow a single normal distribution, as a packet can take one of many routes from one region to another (Figure 8, below). This may make real performance fluctuate and “jitter” more compared to a rather idealistic model.

WAN Latency between VA and OR.
Figure 8: WAN Latency between VA and OR.

I did not account for some processing overheads as well. In EPaxos, a node must figure out the dependency graph for each request, and for high-conflict workloads these graphs may get large requiring more processing power. My model is simple and assumes this overhead to be negligible.

Few Concluding Remarks

Over the series of paxos performance modeling posts I looked at various algorithms and parameters that affect their performance. I think it truly helped me understand Paxos a bit better than before doing this work. I showed that network fluctuations have little impact on paxos performance (k-order statistics helps figure this one out). I showed how node’s processing capacity limits the performance (I know this is trivial and obvious), but what is obvious, but still a bit interesting about this is that a paxos node processes roughly half of the messages that do not make a difference anymore. Once the majority quorum is reached, all other messages for a round carry a dead processing weight on the system.

The stability of Paxos compared to other more complicated flavors of paxos (EPaxos, wPaxos) also seems interesting and probably explains why production-grade systems use paxos a lot. Despite having lesser capacity, paxos is very stable, as its latency changes little at levels of throughput. Additionally, The maximum throughput of paxos is not affected by the workload characteristics, such as conflict or locality. This predictability is important for production systems that must plan and allocate resources. It is simply easier to plan for a system delivering stable performance regardless of the workload characteristics.

Geography plays a big role in WAN paxos performance. Despite the cluster having the same maximum throughput, the clients will observe the performance very differently depending on the leader region. Same goes with EPaxos and wPaxos, as different regions have different costs associated with communicating to the quorums, meaning that clients in one region may observe very different latency than their peers in some other regions. I think this may make it more difficult to provide same strong guarantees (SLAs?) regarding the latency of operations to all clients in production systems.

There are still many things one can study with the models, but I will let it be for now. Anyone who is interested in playing around may get the models on GitHub.

 

Modeling Paxos Performance – Part 2

In the previous posts I started to explore node-scalability of paxos-style protocols. In this post I will look at processing overheads that I estimate with the help of a queue or a processing pipeline. I show how these overheads cap the performance and affect the latency at different cluster loads.

I look at the scalability for a few reasons. For one, in the age of a cloud 3 or 5 nodes cluster may not be enough to provide good resilience, especially in environments with limited control over the node placement. After all, a good cluster needs to avoid nodes that share common points of failures, such as switches of power supply. Second, I think it helps me learn more about paxos and its flavors and why certain applications chose to use it.  And third, I want to look at more exotic paxos variants and how their performance may be impacted by different factors, such as WAN or flexible quorums. For instance, flexible quorums present the opportunity to make trade-offs between performance and resilience. We do this by adjusting the sizes of quorums for phase-1 and phase-2. This is where the modeling becomes handy, as we can check if a particular quorum or deployment makes a difference from the performance standpoint.

Last time, I looked at how local network variations affect the performance when scaling the cluster up in the number of servers. What I realized is that the fluctuations in message round-trip-time (RTT) can only explain roughly 3% performance degradation going from 3 nodes to 5, compared to 30-35% degradation in our implementation of paxos. We also see that this degradation depends on the quorum size, and for some majority quorum deployments there may even be no difference due to the network. In this post I improve the model further to account for processing bottlenecks.

As a refresher from the previous time, I list some of the parameters and variables I have been using:

  • \(l\) – some local message in a round
  • \(r_l\) – message RTT in local area network
  • \(\mu_l\) – average message RTT in local area network
  • \(\sigma_l\) – standard deviation of message RTT in local area network
  • \(N\) – number of nodes participating in a paxos phase
  • \(q\) – Quorum size. For a majority quorum \(q=\left \lfloor{\frac{N}{2}}\right \rfloor  +1\)
  • \(m_s\) – time to serialize a message
  • \(m_d\) – time to deserialize and process a message
  • \(\mu_{ms}\) – average serialization time for a single message
  • \(\mu_{md}\) – average message deserialization time
  • \(\sigma_{ms}\) – standard deviation of message serialization time
  • \(\sigma_{md}\) – standard deviation of message deserialization time

The round latency \(L_r\) of was estimated by \(L_r = m_s + r_{lq-1} + m_d\), where \(r_{lq-1}\) is the RTT + replica processing time for the \(q-1\)th fastest messages \(l_{q-1}\)

Message Processing Queue

Most performance difference in the above model comes from the network performance fluctuations, given that \(m_s\), \(m_d\) and their variances are small compared to network latency. However, handling each message creates significant overheads at the nodes that I did not account for earlier. I visualize the message processing as a queue or a pipeline; if enough compute resources are available, then the message can process immediately, otherwise it has to wait until earlier messages are through and the resources become available. I say that the pipeline is clogged when the messages cannot start processing instantaneously.

The round leader is more prone to clogging, since it needs to process \(N-1\) replies coming roughly at the same time for each round. For the model purposes, I consider queuing/pipeline costs only at the leader. The pipeline is shared for incoming and outgoing messages.

Lets consider a common FIFO pipeline handling messages from all concurrent rounds and clients. When a message \(l_i\) enters the pipeline at some time \(t_{ei}\), it can either process immediately if the pipeline is empty or experience some delay while waiting for the its turn to process.

In the case of empty pipeline, the message exits the queue at time \(t_{fi} = t_{ei} + o\), where \(o\) is message processing overhead \(m_s\) or \(m_d\) depending on whether the message is outgoing or incoming. However, if there is a message in the queue already, then the processing of \(l_i\) will stall or clog for some queue waiting time \(w_i\), thus it will exit the pipeline at time \(t_{fi} = t_{ei} + w_i + o\). To compute \(w_i\) we need to know when message \(l_{i-1}\) is going to leave the queue: \(w_i = t_{fi-1} – t_{ei}\). In its turn, the exit time \(t_{fi-1}\) depends of \(w_{i-1}\), and so we need to compute it first. We can continue to “unroll” the pipeline until we have a message \(l_n\) without any queue waiting time (\(w_{i-n} = 0\)). We can compute the dequeue time for that message \(l_n\), which in turns allows us to compute exit time of all following messages. Figure 1 shows different ways a pipeline can get clogged, along with the effects of clog accumulating over time.

States of a pipeline
Figure 1. States of a pipeline

Unlike earlier, today I also want to model the overheads of communicating with the clients, since in practice we tend to measure the performance as observed by the clients. This requires the round model to account for client communication latency \(r_c\) which is one network RTT. Each round also adds a single message deserialization (client’s request) and a message serialization (reply to a client) to the queue.

Let me summarize the parameters and variables we need to model the queuing costs:

  • \(r_c\) – RTT time to communicate with the client
  • \(n_p\) – the number of parallel queues/pipelines. You can roughly think of this as number of cores you wish to give the node.
  • \(s_p\) – pipeline’s service rate (messages per unit of time). \(s_p = \frac{N+2}{N\mu_{md} + 2 \mu_s}\)
  • \(w_i\) – pipeline waiting time for message \(l_i\)
  • \(R\) – throughput in rounds per unit of time.
  • \(\mu_{r}\) – mean delay between rounds. \(\mu_{r} = \frac{1}{R}\)
  • \(\sigma_{r}\) – standard deviation of inter-round delay.

Now lets talk about some these parameters a bit more and how they relate to the model.

Pipeline service rate \(s_p\) tells how fast a pipeline can process messages. We can get this metric by looking at average latencies of message serialization \(\mu_{ms}\) and deserialization/processing \(\mu_{md}\). With \(N\) nodes in the cluster, we can find an average message overhead of the round \(\mu_{msg}\). For a given round, the leader node needs to handle 2 message serializations (one to start the round and one to reply back to client and \(N\) deserializations (\(N-1\) from followers and one from the client). This communication pattern gives us \(\mu_{msg} = \frac{N\mu_{md}+2\mu_{ms}}{N+2}\). A reciprocal of \(\mu_{msg}\) gives us how many messages can be handled by the pipeline per some unit of time: \(s_p = \frac{N+2}{N\mu_{md} + 2\mu_s}\).

Variable \(w_i\) tells how backed up the pipeline is at the time of message \(l_i\). For instance, \(w_i = 0.002 s\) means that a message \(l_i\) can start processing only after 0.002 seconds delay. Figure 2 illustrates the round execution model with queue wait overheads.

Paxos round and its overheads.
Figure 2. Paxos round and its overheads.

To properly simulate multi-paxos, I need to look at multiple rounds. Variable \(R\) defines the throughput I try to push through the cluster, as higher throughput is likely to lead to longer queue wait times. I also need to take into consideration how rounds are distributed in time. On one side of the spectrum, we can perform bursty rounds, where all \(R\) rounds start at roughly the same time. This will give us the worst round latency, as the pipelines will likely clog more. On the other side, the rounds can be evenly dispersed in time, greatly reducing the competition for pipeline between messages of different rounds. This approach will lead to the best round latency. I have illustrated both of these extremes in round distribution in Figure 3.

Spacing between rounds.
Figure 3. Spacing between rounds.

However, the maximum throughput \(R_{max}\) is the same no matter how rounds are spread out, and it is governed only by when the the node reaches the pipeline saturation point: \(R_{max}(N+2) = n_ps_p\) or \(R_{max}(N+2) = \frac{n_p(N+2)}{N\mu_{md} + 2\mu_{ms}}\). As such, \(R_{max} = \frac{n_p}{N\mu_{md} + 2\mu_{ms}}\). In the actual model simulation, the latency is likely to spike up a bit before this theoretical max throughput point, as pipeline gets very congested and keeps delaying messages more and more.

The likely round distribution is probably something more random as different clients interact with the protocol independently of each other, making such perfect round synchronization impossible. For the simulation, I am taking the uniform separation approach and add some variability to it by drawing the round separation times from a normal distribution \(\mathcal{N}(\mu_r, \sigma_r^2)\). This solution may not be perfect, but normal distribution tend to do fine in modeling many natural random phenomena. I can also control how much different rounds can affect each other by changing the variance \(\sigma_r^2\). When \(\sigma_r\) is close to 0, this becomes similar to uniformly spaced rounds, while large values of \(\sigma_r\) create more “chaos” and conflict between rounds by spreading them more random.

Now I will put all the pieces together. To model the round latency \(L_r\), I modify the old formula to include the queuing costs and client communication delays. Since the round latency is driven by the time it takes to process message \(l_{q-1}\), I only concern myself with the queue waiting time \(c_{q-1}\) for the quorum message. As such, the new formula for round latency is \(L_r = (m_s + r_{lq-1} + c_{q-1} + m_d) + (m_{cd} + m_{cs} + r_c)\). In this formula, \(m_{cd}\) is deserialization overhead for the client request, and \(m_{cs}\) is the serialization overhead for server’s reply back to client.

Simulation Results

As before, I have a python script that puts the pieces together and simulates multi-paxos runs. There are quite a few parameters to consider in the model, so I will show just a few, but you can grab the code and tinker with it to see how it will behave with different settings. Figure 4 shows the simulation with my default parameters: network settings taken from AWS measurements, pipeline performance taken from the early paxi implementation (now it is much faster). Only one pipeline/queue is used. The distribution of rounds in time is controlled by inter-round spacing \(\mu_r = \frac{1}{R}\) with \(\sigma_{r} = 2\mu{r}\).

Latency as a function of throughput for different cluster sizes (Simulation)
Figure 4. Latency as a function of throughput for different cluster sizes (Simulation)

Next figure (Figure 5) shows how latency changes for inter-round delay variances. The runs with higher standard deviation \(\sigma_r\) appear more “curvy”, while the runs with more uniform delay do not seem to degrade as quick until almost reaching the saturation point. High \(\sigma_r\) runs represent more random, uncoordinated interaction with the cluster, which on my opinion is a better representation of what happens in the real world.

Latency as a function of throughput in 5 node cluster for different round spread parameters.
Figure 5. Latency as a function of throughput in 5 node cluster for different inter-round delay parameters.

Do I Need to Simulate Paxos Rounds?

The results above simulate many individual rounds by filling the pipeline with messages and computing the queue wait time for each round. Averaging the latencies across all simulated rounds produces the average latency for some given throughput. However, if I can compute the average queue waiting time and the average latency for the quorum message, then I no longer need to simulate individual rounds to essentially obtain these parameters. This will allow me to find the average round latency much quicker without having to repeat round formula computations over and over again.

Let’s start with computing average latency for a quorum message \(r_{lq-1}\). Since that \(l_{q-1}\) represents the last message needed to make up the quorum, I can model this message’s latency with some \(k\)th-order statistics sampled from Normal distribution \(\mathcal{N}(\mu_l+\mu_{ms}+\mu_{md}, \sigma_l^2 + \sigma_{ms}^2 + \sigma_{md}^2)\) on a sample of size \(N-1\), where \(k=q-1\). To make things simple, I use Monte Carlo method to approximate this number \(r_{lq-1}\) fairly quickly and accurately.

Now to approximating the queue wait time \(w_{q-1}\). This is a bit more involved, but luckily queuing theory provides some easy ways to compute/estimate various parameters for simple queues. I used Marchal’s average waiting time approximation for single queue with generally distributed inter-arrival and service intervals (G/G/1). This approximation allows me to incorporate the inter-round interval and variance from my simulation into the queuing theory model computation.

I will spare the explanation on arriving with the formula for the average round queue wait time (it is pretty straightforward adaptation from here, with service and arrival rates expressed as rounds per second) and just give you the result for a single queue and single worker:

  • \(p = R(N\mu_{md} + 2\mu_{ms})\), where \(p\) is queue utilization or probability queue is not busy.
  • \(C_s^2 = \frac{N^2\sigma_{md}^2 + 2^2\sigma_{ms}^2}{(N\mu_{md} + 2\mu_{ms})^2}\)
  • \(C_a^2 = \frac{sigma_r^2}{\mu_r^2}\)
  • \(w=\frac{p^2(1+C_s^2)(C_a^2+C_s^2p^2)}{2R(1-p)(1+C_s^2p^2)}\)

With the ability to compute average queue waiting time and average time for message \(l_{q-1}\) turn around, I can compute the average round latency time for a given throughput quickly without having to simulate multiple rounds to get the average for these parameters. \(L = 2\mu_{ms} + 2\mu_{md} + r_{lq-1} + w + \mu_l\), where \(r_{lq-1}\) is the mean RTT for quorum message \(l_{q-1}\) and \(w\) is the average queue wait time for given throughput parameters and \(\mu_l\) is the network RTT for a message exchange with the client.

As such, the average round latency becomes:

\(L = 2\mu_{ms} + 2\mu_{md} + r_{lq-1} + \frac{p^2(1+C_s^2)(C_a^2+C_s^2p^2)}{2R(1-p)(1+C_s^2p^2)} + \mu_l\)

Figure 6 shows the model’s results for latency at various throughputs. The queuing theory model exhibits very similar patterns as the simulation, albeit the simulation seems to degrade quicker at higher throughputs then the model, especially for 3-node cluster. This may due to the fact that the simulation captures the message distribution within each round, while the model looks at the round as one whole.

Model for latency as a function of throughput for different cluster sizes.
Figure 6. Model for latency as a function of throughput for different cluster sizes.

Flexible Quorums

I can use both the simulation and the model to show the difference between paxos and flexible paxos (FPaxos) by adjusting the quorums. For instance, I modeled a 9-node deployment of flexible paxos with phase-2 quorum \(q2\) of 3 nodes. In my setup, flexible paxos must still communicate with all 9 nodes, but it needs to wait for only 2 replies, thus it can finish the phase quicker then the majority quorum. However, as seen in Figure 7, the advantage of smaller quorum is tiny compared to normal majority quorum of 9-node paxos. Despite FPaxos requiring the same number of messages as 5-node paxos setup, the costs of communicating with all 9 nodes do not allow it to get closer in performance event to a 7-machine paxos cluster.

Modeling Paxos and FPaxos 9 nodes, with |q2|=3
Figure 7. Modeling Paxos and FPaxos 9 nodes, with |q2|=3

Conclusion and Next steps

So far I have modeled single-leader paxos variants in the local area network. I showed that network variations have a negligible impact on majority quorum paxos. I also illustrated that it is hard to rip the performance benefits from flexible quorums, since queuing costs of communicating with large cluster become overwhelming. However, not everything is lost for FPaxos, as it  can reduce the number of nodes involved in phase-2 communication from full cluster size to as little as \(|q2|\) nodes and greatly mitigate the effects of queue waiting time for large clusters.

The simulation and model are available on GitHub, so you can check it out and tinker with parameters to see how the performance may change in response.

There are still quite a few other aspects of paxos that I find interesting and want to model in the future. In particular, I want to look at WAN deployments, multi-leader paxos variants and, of course, our WPaxos protocol that combines multi-leader, WAN and flexible quorums.

Paxos Performance Modeling – Part 1.5

This post is a quick update/conclusion to the part 1. So, does the network variations make any impact at all? In the earlier simulation I showed some small performance degradation going from 3 to 5 nodes.

The reality is that for paxos, network behavior makes very little difference on scalability, and in some cases no difference at all. To see what I mean, look at the figure below:

ln

See how 4 and 6 and 36 node perform the same in my simulation? And how 5, 7 and even 35 nodes clusters slightly outperforms 4 nodes?

The intuitive high level explanation for even-numbered simulation results is quite simple. For even numbered cluster sizes, a round leader receives an odd number of replies, assuming a self-voting leader. The leader also decides the round after reaching a majority  quorum \(q=\left \lfloor{\frac{N}{2}}\right \rfloor  +1\), meaning that it needs to receive \(q-1\)  or \(\left \lfloor{\frac{N}{2}}\right \rfloor\)messages (with self-voting). As it happens, for even clusters, this message is exactly the median fastest message of the round. For instance, a 6 node cluster leader will receives 5 replies, but the round reaches the majority at the 3rd (or median) reply.

Since the simulation draws message RTTs from a normal distribution, the median (50th  percentile) RTT is also the mean. After repeating it for sufficient number of rounds, any fluctuations are averaged out, resulting in an average round decided by a message with an average RTT for the network.

The cluster with odd number of servers, however, decides on the round at a message with RTT slightly less then the median RTT. This is because we have an even number of replies, and median is computed be averaging two middle RTTs. The smaller of the two values used for computing the median is actually the quorum message for the round.  For instance, in a 7-node deployment, the leader reaches quorum after receiving 3rd message \(l_3\), with median being \(\frac{l_3+l_4}{2}\)

As a result, after many round repeats (I do ~125000 rounds) the simulation still ends up with an average RTT of a quorum message to be a tiny bit less than the median/mean RTT, and the more nodes I add, the closer it becomes to the actual 50th percentile and the mean.

So, what do we have after all of this? I think it is safe to assume the effects of network variance on paxos performance are very small and sometimes non-existent. We should not worry about the network as much, as long as it is stable and delivers predictable performance.  However, if you have a system with non-majority quorums, you may get slight benefit from quicker replies.

Update (3/10/2018):

  • Part 2 – Queuing/Processing overheads

 

 

Do not Blame (only) Network for Your Paxos Scalability Issues. (PPM Part 1)

In the past few months our lab has been doing a lot of work with different flavors of paxos consensus algorithm. Paxos and its numerous flavors are widely used in today’s cloud infrastructure. Distributed systems rely on it for many different tasks to ensure safe operation. For instance, coordination services use some consensus protocol flavor to provide services like leader election, cluster membership, service discovery and metadata management. Databases, such as Spanner or CockroachDB, use paxos to provide fault tolerant-replication of data across nodes or even datacenters.

If you work with Paxos, or had to deal with it at some point, you probably heard that it does not scale well to large number of nodes. ”Five nodes is ideal, do not try more” rhetoric has been repeated many times by many people and it gets engraved into one’s mind. ZooKeeper’s Administrator’s guide mentions 3 and 5 server deployments. Fairly recent epaxos provides evaluation on 3 and 5 node deployments in their paper. Paxos Made Live paper also mentions five nodes as typical Chubby deployment.

But why five servers is such a magical number in the Paxos world? The most common answer is along the lines of ”Increasing the number of servers increases the quorum size, making the paxos round leader wait for more messages to come to reach consensus”. This explanation is straightforward on the surface, after all, waiting for n nodes to reply should take longer than waiting for n − 1. The question then becomes why waiting for more replies is expensive? I thought the answer would be largely network related, but it appears to be more complex.

Modeling Paxos Performance: First Attempt

To answer this, I tried to model the non-faulty paxos execution time by looking at message exchange between nodes in the cluster. In the nutshell, running a phase of paxos requires one round-trip-time (RTT) between a node and its peers: the node broadcasts the message and waits for the quorum to reply. With network arguably being the slowest part, I can try  to express the paxos phase performance through the communication delays

First, let me define some variables to model a phase of paxos:

  • \(r_l\) – message RTT in local area network
  • \(\mu_l\) – average message RTT in local area network
  • \(\sigma_l\)  – standard deviation of message RTT in local area network
  • \(N\) – number of nodes participating in a paxos phase
  • \(q\) – quorum size. For a majority quorum \(q=\left \lfloor{\frac{N}{2}}\right \rfloor  +1\)
  • \(m_s\) – time to serialize a message
  • \(m_d\) – time to deserialize and process a message. This involves various message-related round tasks, such as ballot comparisons, log maintenance/updated, etc.
Figure 1: Amazon AWS EC2 local ping latency
Figure 1: Amazon AWS EC2 local ping latency

I assume that the network performance, at least in the local area, is normally distributed. Figure 1 shows a normalized histogram (shaded area is equal to 1) of latencies of approximately 2,000 ping requests within an AWS region.

RTT \(r_l\) for every message is drawn from a normal distribution \(\mathcal{N}(\mu_l+m_s+m_d, \sigma_l^2)\). This distribution simulates RTTs with additional  static, non-variable delay for serialization and deserialization. As such, the only variability so far is due to the network behavior.

In order to run a round of paxos, a node needs to send \(N − 1\) messages. That is, a node sends a message to every other node except for itself. I assume a ”leader” is also an acceptor with a short-circuit behavior, where a it does not need to send a vote to itself over a network.

Figure 2: Message RTT for a 7 node cluster
Figure 2: Message RTT for a 7 node cluster

Out of the \(N − 1\) messages sent, only \(q − 1\) messages actually matter. Upon receiving \(q − 1\) successful (once again, assuming non-faulty execution) replies a node has achieved a quorum and can finish the round (Figure 2). We can simulate this by drawing \(N −1\) random RTTs \(r_{l1}, r_{l2},…, r_{lN-1}\) from \(\mathcal{N}(\mu_l+m_s+m_d, \sigma_l^2)\) and sorting them. The \(q − 1\) fastest message \(r_{lq−1} \) is the one carrying the last vote to make up a quorum, thus after processing this message, the node no longer needs to wait for other messages to come.

Assuming that a node broadcasts all messages at the same time, message RTT \(r_{lq−1} \) can then be used to express the latency \(L_r\) for the entire round: \(L_r = m_s + r_{lq−1} + m_d\). Figure 3 visually represents the paxos round expressed in the formula.

Figure 3: Visual representation of time needed to execute a round.
Figure 3: Visual representation of time needed to execute a round

Does the Model Make Sense?

With this simple model, I can simulate paxos execution. In multi-paxos optimization where phase-1 is used to primarily pick a stable leader and phase-2 repeats a number of times, the performance of paxos is approximated by just looking at phase-2. If \(m_s\) and \(m_d\) are constant, the variability in performance is only due to the network fluctuations.

I created a small python script to run such simulation, as if a single client was interacting with the paxos synchronously one command at a time. Figure 4 illustrates the results with \(m_s = m_d = 0.01 ms\) and network parameters taken from AWS ping latency figure.

Figure 4: Simulated TP and Latency from one client
Figure 4: Simulated Thtoughput and Latency from one client

At the first glance, we observe degradation in throughput and increase in latency. However, the performance decreases very slowly, and once I reach 9 nodes, the performance stays almost flat. This is very different from the data we have observed on our actual paxos implementation. Figure 5 shows the throughput and latency from a few concurrent clients interacting with the protocol.

Figure 5: Paxos TP and Latency from a small number of concurrent clients
Figure 5: Paxos Throughput and Latency from a small number of concurrent clients

The first thing that catches my attention is the performance degradation between the 3 and 5 node deployments. On the simulation, the difference in throughput was only 2.8%, while real paxos degraded by astonishing 30%.  As the cluster size increases, beyond 5 nodes, the real paxos also appear to degrade quicker.

Clearly, the network variability alone cannot explain the performance hit from increasing the number of nodes. So what is missing from the simple model that greatly impacts the performance as the cluster grows?

Towards an Improved Model

Starting from the obvious, the per-message serialization \(m_s\) and processing \(m_d\) overheads are not static constant parameters, instead they introduce some variance as well. However, \(m_s\) and \(m_d\) are small to start with, and making them introduce additional variance to the model will make it better, but it will not change the overall model simulation much.

Assuming that \(m_s\) is drawn from a normal distribution \(\mathcal{N}(\mu_{ms}, \sigma_{ms}^2)\) and \(m_d\) is similarly drawn from \(\mathcal{N}(\mu_{md}, \sigma_{md}^2)\), then the message RTT time \(r_{l}\) must be drawn from \(\mathcal{N}(\mu_l+\mu_{ms}+\mu_{md}, \sigma_l^2 + \sigma_{ms}^2 + \sigma_{md}^2)\). Making this changes to the model resulted in a difference between 3 and 5 node simulations to grow from 2.8% to 2.9% with \(\mu_{ms} = \mu_{md} = 0.01\) and \(\sigma_{ms} = \sigma_{md} = 0.002\) (some arbitrary values that make little impact, unless same magnitude as network mean and standard deviation).

Introducing the variability to message overheads \(m_s\) and \(m_d\) still does not account for these overheads being dependent on the number of node in the cluster. The dependence on \(N\) is not direct, and instead the per-message costs increase as the number of messages that a server needs to handle grows. This puts a ”leader” node repeating the phase-2 of paxos in a more vulnerable position as it will experience the increased traffic.

To understand how message serialization and processing depends on the number of messages exchanged, we need to consider the implementation of application’s network layers. Often times, as an application receives the message, it will use one or more processing queues or pipelines to deserialize the message and dispatch it to the appropriate handler for processing.

Let’s consider an example with just one such pipeline. If the queue is empty upon message arrival, it will get to deserialize with no further delay. However, when there are other messages in the queue, the new message has to wait for it turn to be processed.

A single Paxos round has a bursty network utilization, especially at the leader. This is because the leader node first broadcasts a messages and then receives multiple replies at roughly the same time. If the message deserialization has not finished before the next message arrives, then the pipeline gets clogged. Further messages potentially make the issue worse by growing the queue, as shown in Figure \ref{fig:pipeline_clogged}.

Figure 6: Example of deserialization pipeline stalling paxos round
Figure 6: Example of deserialization pipeline stalling paxos round

Obviously, having multiple parallel pipelines can help, given that they are balanced and there are enough compute resources to run them. However, the problem can also get worse when we start running rounds concurrently (i.e. multiple clients interact with paxos). In such scenario, messages from different rounds will compete for the fixed number of processing queues.

Where does this leave me with trying to model paxos round performance? I need to account for a number of additional factors, such as the number of message-processing queues/pipelines/threads, the rate of message deserialization at each queue, and the number of concurrent paxos rounds. Oh, and since the messages arriving after the quorum has been reached still need to get deserialized, they will contribute to how clogged the pipelines are.

I will pick up from this point onwards in some future post. I will also try to look at flexible quorums and our WPaxos protocol.

Update (3/10/2018):

  • Part 1.5 – Does network matter at all?
  • Part 2 – Queuing/Processing overheads

Trace Synchronization with HLC

Event logging or tracing is one of the most common techniques for collecting data about the software execution. For simple application running on the same machine, a trace of events timestamped with the machine’s hardware clock is typically sufficient. When the system grows and becomes distributed over multiple nodes, each node is going to produce its own independent logs or traces. Unfortunately, nodes running on different physical machines do not have access to a single global master clock to timestamp and order the events, making these individual logs unaligned in time.

For some purposes, such as debugging, engineers often need to look at these independent logs as one whole. For instance, if Alice the engineer needs need to see how some event on one node influenced the rest of the system, she will need to examine all logs and pay attention to the causal relationships between events. The time skew of different nodes and the trace misalignment caused by such skew prevents Alice from safely relying on such independently produced traces, as shown in the figure below. Instead, she need to collate the logs together and reduce the misalignment as much as possible to produce a more coherent picture of a distributed system execution.

Misaligned logs with PT
Physical time makes logs misaligned and miss the causality between events.

Of course if only the causality was important, Alice could have used Lamport’s logical clocks (LC) to help her identify some causal relationship between the events on different nodes. Alternatively, logging system can also use vector clocks (VC) to capture all of the causal relationships in the system. However, both LC and VC are disjoint from the physical time of the events in the trace, making it hard for Alice to navigate such logging system.

Using synchronized time protocols, such as NTP and PTP  will help make the traces better aligned. These protocols are not perfect and still leave out some synchronization error or uncertainty, introducing the possibility of breaking the causality when collating logs purely with them.

HLC sync

Instead of using NTP or logical time for synchronizing the event logs, I thought whether it is possible to use both at the same time with the help of Hybrid Logical Time (HLC). HLC combines a physical time, such as NTP with logical clock to keep track of causality during the period of synchronization uncertainty. Since HLC acts as a single always-increasing time in the entire distributed system, it can be used to timestamp the events in the log traces of every node. You can learn more about HLC here.

Similar to logical time, HLC does not capture full causality between the events. However, HLC conforms to the LC implication: if event A happened before event B, then HLC timestamp of A is smaller than the HLC timestamp of B. This can be written as A hb B ⇒ hlc.A < hlc.B. Obviously, we cannot use HLC timestamps to order any two events. Despite this limitation, we can still use the LC implication to give some partial information about the order of events. If an event A has an HLC timestamp greater than the event B, we can at least say that event B did not happen before A, thus either A happened before B or A is concurrent with B: hlc.A < hlc.B ⇒ A hb B ∨ A co B.

We can use this property to adjust the synchronization between the traces produced at different nodes. Let’s assume we have two nodes with some clock skew. These nodes produce logs that are not fully synchronized in time (we also assume the knowledge of a global, “ideal” time for now). The events in the log happen instantaneously, however we can rely on the machine’s clock to measure the time between events on the same node to give the “rigidity” to the logs. Each node timestamps the log events with its machine’s physical time (PT).

Unaligned logs
Logs aligned based on PT time. Ideal global time is shown in red.

In the figure above, the two logs are not synchronized in the “ideal” time, even though they appear to be in sync based on the PT of each node. Without any additional information we cannot improve on the synchrony of these logs. However, if we replace PT time with HLC, we can achieve better trace synchronization.

unaligned logs with HLC
HLC can be used instead of PT to synchronize the traces

With the addition of HLC time, we may see that when the logs are aligned by the PT only, some HLC timestamps (highlighted in yellow) appear to be out of place. In particular, this alignment does not satisfy the LC condition (A hb B ⇒ hlc.A < hlc.B), since the alignment makes event a1 to appear to happen before c2, however, hlc.c2 > hlc.a1. In order to satisfy the condition, we need to re-sync the logs such that c2 appear concurrent with a1.

HLC is used to improve log synchronization.
HLC is used to improve log synchronization.

After the adjustment, our synchronization error between two nodes is reduced. Note that we cannot synchronize the logs better and put a1 to happen before c2, since the LC implication simple does not allow us to do so.

The two node synchronization works nice and easy because the LC/HLC implication provides some guarantees about the two events, and we pick these two events from two separate nodes.  Aligning more than two logs is more challenging, as we need to perform many comparisons each involving just two events from some two nodes. The number of possible comparison we need to make grows drastically as we increase the number of traces to sync.

However, with HLC we can reduce the problem to just that of performing n-1 2-node log alignments when we need to sync logs from n nodes. HLC operates by learning of higher timestamps from the communication, thus HLC time at all nodes of the cluster tends to follow the PT time of one node with the highest PT clock.  Once, again to see this you need to understand how HLC ticks, which is explained here. Having one node that drives the entire HLC in the cluster allows us to synchronize every other log from each node independently with the log of that HLC “driver node”.

Some Testing and Limitation

I set up some synthetic tests to see if HLC can help us achieve any improvement in log synchronization on already loosely synchronized system. The benchmark generates the logs with a number of parameters I can controlled: time skew, communication latency and event probabilities. Maximum time-skew controls the time desynchronization in the simulated cluster measured in time ticks. There will be two nodes in the simulation with maximum time-skew difference between their clocks. Communication latency parameters control the latency of simulated communications in time ticks. Probability of an event parameter controls the chance of an event happening at every time tick; similarly, the probability of communication determines the chance of an outgoing message happening at the time tick.

Since the logs are generated synthetically, I have access to the ideal time synchronization between these logs, allowing me to quantify the alignment error. I calculated the alignment error as 1 – adjustment / skew for every pair of logs.

The figure below shows the error in synchronizing 5 logs as a function of the log length. The Idea is that a longer trace can allow for more opportunities to find the violations in LC/HLC condition and adjust the logs accordingly. All other parameters were kept constant with skew of 50 ticks, communication delay of 3 to 10 ticks, a chance of event as 40% per tick. I made different plots for different probability of inter-node communication. I repeated the simulation 100,000 times and computed the average error.

results 1
Log alignment error as a function of log length

We can see that in this setup the synchronization between logs was significantly improved, and the improvement happens faster when communication if more frequent. This is because communication introduces the causality between nodes, allowing me to exploit it for synchronization. As the logs grow larger the improvements diminish. This is likely due to the reducing closer to the communication delay, at which point HLC synchronization can no longer make improvements.

Inability to achieve any synchronization improvement when the communication latency is equal or greater than the maximum time skew in the system is the most significant limitation of HLC synchronization. The following graph illustrates that:

results
HLC synchronization limitations

Here I ran the simulation with 3 different skew levels: 10, 20 and 50 ticks. As I increase the communication latency, the error is growing as well, getting to the level of no improvement as the latency reaches the time-skew.

Some Concluding Words

I think this is a rather naïve and simple way to achieve better synchronization for distributed logging software. One can argue that NTP and PTP time synchronization methods are rather good nowadays and that it should be enough for most cases. However, computers are fast now and even in 1 ms of desynchronization many computations can be made. Even a few full round trips of network message exchange can happen in that 1 ms in good Local Area Networks.

HLC synchronization’s simplicity allows it to be implemented purely in user-level application code. There is no need to constantly run NTP protocol to keep tight time synchrony, there is no need to access or modify any of the underlying system-level code beyond just reading the clock, there is no need to access high precision clock that may slow the application down.

HLC can also be used within a single machine to synchronize on traces from different threads. Despite having access to the same clock at all threads and processes, the clock granularity may still be coarse enough to do many computations within a single tick.

HLC sync is not without the limits. Its usefulness degrades as the synchronization gets closer to the communication latency, but it can still be used as a fail-safe mechanism in cases NTP fails.

Alternatively to synchronizing logs, HLC can be used to find consistent global states and search through the distributed application’s past execution.

Sonification of Distributed Systems with RQL

In the past, I have discussed sonification as a mean of representing monitoring data. Aside from some silly and toy examples, sonifications can be used for serious applications. In many monitoring cases, the presence of some phenomena is more important than the details about it. In such situations, simple sonification is a perfect way to alert users about the occurrence of such phenomena.  For example, Geiger counter alerts users of the radioactive decay, without providing any further details.

Our recent work on Retroscope and RQL allows us to bring sonification to the same “phenomena-awareness” plane for distributed systems. With RQL we can search for interesting conditions happening globally in the system, and we can use sounds to alert the engineers when certain predicates occur in the global state. Of course, for such system to be useful, it needs to be real-time, as the recency of events is the most important attribute of this type of sonification. For instance, users of a Geiger counter are not interested in hearing ticks for decay events happening a minute ago, as it rather defeats the purpose of the counter. Similar requirements apply to the “phenomena-awareness” tools in the distributed systems domain, as engineers need to know what happens in the system instantaneously.

RQL can easily allow inspecting past states, however it lacks when it comes to the current state inspection.  What we need is a streaming query that can continuously receive individual logs from Retroscope log-servers, and perform the search as soon as sufficient data become available to form a consistent cut. Of course, streaming queries will still be lagging behind the current time, however we can make this lag small enough to be virtually unobservable by a person.

With streaming queries we can not only receive the cuts meeting some search criteria in near real time, but we can also sonify the mere fact such cuts have been found. If we look at the previous example of ZooKeeper staleness, we can run sonified streaming queries to have the system alert us when the data staleness reaches some threshold, such as two or more versions stale.  In the audio clip below, we have used MIDI to sonify the stream of staleness events occurring in a ZooKeeper cluster. Multiple queries were sonified to produce different sounds for different staleness of ZooKeeper data.

We can hear periods of silence when the staleness is below the threshold of 2 values, however we can also observe some variations in cluster performance. For instance, it is very easy to identify when the cluster was experiencing more problems than normal. It is also easy to hear it recovering from the spike in staleness soon after.

The First Datastore-driven Vehicle

vold-carIt is not a secret that procrastination is the favorite activity of most PhD students. I have been procrastinating today, even though my advisor probably wants me to keep writing.  In the midst of my procrastination, I thought: “Why are there self-driving vehicles, but no database-driven vehicles?” As absurd as it sounds, I gave it a try. And now I not so proudly present the prototype of the world’s first(?) datastore-driven vehicle. The implementation was quick and simple. I took the Voldemort code I have been tinkering with in my Voldemort traffic light post, and changed it to send the signals over Bluetooth connection to a Frankenstein-of-a-car I’ve built very quickly out a few motors, a gearbox, an Arduino, a bunch of wires, some tracks, and lots of thermal glue.

The car changes the direction of travel when the workload characteristics change: it will go straight when number of read operations is the same as number of write operation.  By changing the read-to-write ratio of the workload, the car will turn either left or right. The prototype vehicle is not very responsive (weak motors/dead batteries) and sometimes runs into things despite my best attempts to control it.

And now, once the world has a database-driven car, the procrastination can continue… Or should I start writing?

Monitoring with Retroscope: Detecting Invariant Violations

Earlier I briefly mentioned Retroscope, our distributed snapshot library that makes taking non-blocking, unplanned consistent global distributed snapshots possible. However, these snapshots are only good if we know how to use them well. Of course the most obvious use case is just a data backup, and despite it being an important application for snapshots, I feel it being a bit boring to my taste. What I am thinking right now is using snapshots for distributed monitoring and debugging.

Let’s consider an application that has a global invariant predicate P, and we want to check if a distributed system holds the invariant P at all times. This means that we should never see a consistent cut in which predicate P = false. So our problem is boiling down to looking for consistent cuts that violate P. Luckily, Retroscope can do exactly this, since we can take one snapshots and incrementally move forward in time as the application execution progresses, checking the predicates by looking at consistent cuts as the state advances.

With the basic Retroscope described in the earlier post, finding predicate violations is a rather cumbersome effort that requires writing new code for every invariant a user wishes to check. So in the past few months I have been working on Retroscope extension tailored specifically for debugging and monitoring use cases.  Improved Retroscope exposes the Retroscope Query Language (RQL), a SQL-like interface to allow users write queries to search for conditions happening in the consistent cuts.

Now let’s go back to our hypothetical system with global invariant P and for now assume P holds when all local predicates p0, p1, p2, …, pn hold on the nodes [0 … n]. As such, P = p0  p1  p2  …  pn, and if any of the local predicates fail, the global predicate fails as well. For the simplicity of the example, we can say that local predicate pi is following: pi = ai + bi > ci. This makes each node maintain all three variables, although the nodes may have different values. With Retroscope, we can expose these local variables to be stored in the local log named inv. The log will maintain both the current version of the variables and the history of variable changes.

How do we look for the violation of such invariant with RQL? Just a single query would suffice for us:

SELECT inv.a, inv.b, inv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK SAME_NODE

Now we can dissect this query into bits and see what happens there. RQL queries are meant to retrieve consistent cuts that satisfy certain criteria. The list of parameters following the SELECT statement specifies what variables we want to see in the resultant consistent cuts. FROM keyword enumerate the logs we use in this particular query. The actual consistent cut criteria are specified after the WHEN keyword. In the particular case the condition for emitting cuts is (inv.a + inv.b <= inv.c) LINK SAME_NODE, which is equivalent to emitting cuts when the following holds:f1

By now a curios reader would have probably asked a question of why we even bother with consistent cut in this particular example. All predicates can be checked locally and their evaluation does not depend on other remote servers, so we can simply run local monitors and do not worry about consistent cuts and time synchronization at all: failure on one node designate the failure of the system globally no matter the time. Retroscope and RQL shines when we break away from this locality. What if our invariant involves messages being sent and received? Or what if in involves different parameters that exists on different machines at the same time? With the ability of looking at consistent cuts, RQL breaches the boundary of a single node. Below I list just a few variations of the original query that no longer deal with conjunction of local predicates and look at global state as a whole:

  1. SELECT inv.a, inv.b, anv.c FROM inv WHEN inv.a + inv.b <= inv.c
    • Omitting LINK SAME_NODE part changes the operation of the query drastically, as all three variables are no longer bound to co-exist on the same node:f2
    • rql_predicates1
  2. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) LINK EACH_NODE
    • Replacing LINK SAME_NODE with LINK EACH_NODE, changes the search condition to require every node satisfying it in the consistent cut:f3
    • rql_predicates2
  3. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c
    • Rewriting the condition to WHEN (inv.a + inv.b) LINK SAME_NODE <= inv.c will cause the inv.a and inv.b to be summed on the same nodes, and compared to inv.c values from other nodes as well, so the consistent cut is emitted when f4
    • rql_predicates3
  4. SELECT inv.a, inv.b, anv.c FROM inv WHEN (inv.a + inv.b <= inv.c) AND NODE($1) = NODE($3)
    • This query restricts inv.a and inv.c to be on the same node. $1 is the placeholder for the first variable encountered while parsing left to right, and $3 is the third variable. This emits the consistent cuts when f5
    • rql_predicates4

Above are just a few simple examples of what is possible with RQL, however there are limitations. The biggest limitation is the complexity of the conditions. Even though RQL does not limit how many operations are possible in the condition of the query, having large expressions can slow the system down drastically. For example, a simple WHEN inv.a > inv.b will examine all a’s that exist on the nodes of the system at the consistent cut and all b’s in every possible combination. For f6 . Comparison is then carried out on every element of product set E.

P.S. I illustrated some of the syntax as it operates at the time of this writing, however RQL is developing, and I am not sure I like syntax of conditions too much, so it is a subject to change.

Globally Consistent Distributed Snapshots with Retroscope

Taking a consistent snapshot of a distributed system is no trivial task for the reasons of asynchrony between the nodes in the system. As the state of each machine changes in response to incoming external messages or internal events, each node may produce a log of such state changes. With the log abstraction, the problem of taking a snapshot transforms into the issue of aligning the logs together and finding a consistent cut among all these logs. However, time asynchrony between the servers makes collating all the system logs difficult using just physical clocks available at each machine, because clocks tend to drift, producing some time asynchrony or time uncertainty. Various time synchronization protocols, such as NTP and PTP, exist, but perfect synchronization is still unattainable.

Retroscope is the system we designed to take unplanned, non-blocking, consistent global snapshots. Unlike other systems, Retroscope does not need to block while taking snapshot, as it does not need to wait out time uncertainty caused by the clock skews at various machines thanks to the reliance on Hybrid Logical Clocks (HLC) instead of NTP-synchronized (NTP) time.

Figure 1
Using NTP time fails to take consistent snapshot without waiting out the time uncertainty

 

HLC introduces causality information into the clock as messages are being exchanged between servers and provides the same causal guarantees as Lamport’s Logical Clocks (LC). More information about HLC can be found here and here.

Taking a snapshot

Retroscope achieves snapshots by adding HLC into the network communication of the Retroscoped system. Internally, Retroscope keeps a sliding window-log of past state changes along with the associated HLC timestamps at each node. This window-logs are used to facilitate the unplanned nature of taking snapshots. Snapshots are triggered by a special client that maintains the common HLC with the rest of the system. Retroscope allows for instant unplanned snapshot to be started by the initiator, such instant snapshots are guaranteed to capture the states at the time Tnow of snapshot request being issued by the initiator. Obviously, once the snapshot request message reaches the nodes, the time has advanced to Tr > Tnow.

Upon receiving the snapshot request message at Tr, each node starts taking a local snapshot. Since the system does not halt processing requests, depending on the implementation, we may arrive to a local snapshot at some time Tf >= Tr. Because our local snapshot is at the state that happened after the requested time, we need to modify it to arrive to the state at time Tnow. We use the window-log of state changes to undo all operations that happened locally after Tnow, thus arriving to a desired local snapshot. Once all nodes compute local snapshots, Retroscope is done taking a global consistent snapshot at time Tnow.

fig2
Instant Snapshot is being taken. (1) Initiator sends the message to take a snapshot at Tnow, (2) Process n receives the message and start taking a local snapshot (3) All operations performed after Tnow are undone from the local snapshot.

Retroscope provides more flexibility in taking unplanned snapshots. Taking instant snapshot (i.e. snapshot initiated at Tnow) requires each node to maintain only a small log of recent changes. We can, however, expand the instant snapshots and offer retrospective snapshot flexibility at the expense of growing state change log larger. With retrospective snapshots, we can offer the ability to look at the state that has already happened in the past. This functionality is handy for application debugging when there is a need to investigate the root cause of the problem after it has already happened. A distributed reset is another application that can benefit from the retrospective snapshots, as the system can be reset into a correct state after the state has been corrupted.

We have Retroscoped Voldemort key-value database to take data-snapshots. Retroscoping Voldemort took less than a 1000 lines of code for adding HLC to the network protocol, recording changes in the Retroscope window-log, and performing snapshot on Voldemort’s storage. We did the experiments on the 10-node Voldemort cluster with databases of various sizes. We have learned that keeping the window-log of state changes has very little impact on the throughput and latency when no snapshots are being taken, as seen in figure below.

Figure 3
Impact of Retroscope on normal system operation.

Performing snapshot is non-blocking in Retroscope, because there is no need to wait out the time uncertainty. The non-blocking nature allows Voldemort to continue processing both read and write requests while the snapshot is being computed. The figure below shows throughput and latency for every second of execution while taking the snapshot on a 10,000,000 items database (each item is 100 bytes). Overall we have observed an 18% throughput and 25% latency degradation over the snapshot time, however these numbers can be improved by using a separate disk system for snapshot.

Figure 4
Impact of Retroscope while taking a snapshot

What can be done with Retroscope?

We used Retroscope to take snapshots of data on a key-value store, however the utility of snapshots can be very extensive. With powerful snapshot capabilities, such as retrospective snapshot, we can look into the past of our distributed system and search for anomalies. Retrospective snapshots can be used to restore system to the latest correct state after the state corruption. Finding such correct states is also possible with Retroscope; we can use it to take successive snapshots to check for global invariants; it can be powerful in monitoring various application level predicates. Retroscope can perform other monitoring tasks. Unlike other monitoring systems that tend to look at local state independently or isolate monitoring to a request level, Retroscope can look at global parameters of the system across every node in a consistent way. We can even use Retroscope to detect erroneous patterns in message exchange by observing what messages are sent and received and how they impact the state at each node as we go through time.

The Light of Voldemort

Few month ago I showcased how a single server of Voldemort key-value store sounds.  Sonification is a valid way to monitor systems, and has been used a lot in real applications. Geiger counter would be one of the most well-known examples of a sonified application. In some cases sonification may be the preferred form of representing information, as other forms surprisingly may not work as well for human perception. Even visualization of information is often not as good as sonification. Take the same Geiger counter example; research has shown that visual radiation level monitors do not perform quite as well as the sound ones as people tend to be distracted from the display to perform other tasks. Even visual and audio hybrids do not alert users of high radiation levels as good as simple audio counter.

As an example of a hybrid audio-visual system for Voldemort, I have built a small “traffic-light monitor” that changes colors and beeps differently depending on what action is performed by the server. The rig is built with Arduino and simply plugs to the USB of the machine running Voldemort server. Below is a short video of how it operates:

The green light lights up when server handles a “get” operation. Yellow light is for “put” requests and red light is for “get version” commands. As can be see, writing to Voldemort requires two operations, “get version” and “put” and they happen so quickly that Arduino is barely capable to light up the LEDs.

P.S. “Traffic light monitor” is not to be taken seriously, it is rather a silly example to show that there are plenty of ways to monitor a system or represent system’s logs.