Tag Archives: paxos

Reading Group. Protocol-Aware Recovery for Consensus-Based Storage

Our last reading group meeting was about storage faults in state machine replications. We looked at the “Protocol-Aware Recovery for Consensus-Based Storage” paper from FAST’18. 

The paper explores an interesting omission in most of the state machine replication (SMR) protocols. These protocols, such as (multi)-Paxos and Raft, are specified with the assumption of having a crash-resistant disk to write the operation log and voting metadata. This disk data allows crashed nodes to restart safely. However, the real-life gets in a way a bit, as infallible storage is as real as unicorns. 

Storage may fail in peculiar ways, when some data may get corrupted, while most other data is correct and the server itself continues working. The problem here is handling such failures. The simplest way is to treat the server as crashed. However, the server must remain crashed, as restarting may get into even more severe state corruption, as the server replays the operations from a faulty log. The paper talks about a variety of other approaches taken to deal with these data issues. The authors state that all the mechanisms they have explored were faulty and led to liveness or safety issues. I personally do not buy such a blanket statement, but a few of the examples in the paper were really interesting. 

The paper then suggests a solution – Protocol-Aware Recovery (PAR). The main point here is to avoid ad-hoc solutions because they are either slow, unsafe, complicated, or all of the above. This makes sense since such a big omission (potential for data-corrupting disk failures) in protocols should be addressed at the protocol level. The paper draws heavily on the Raft state machine protocol and develops the recovery procedure for it.

The log recovery is leader-based and can be broken down into two sub-protocols: follower recovery and leader recovery. The followers are recovered by restoring the data from the leader who always knows of all the committed history. Leader recovery is a bit more tricky and occurs as part of a leader election. Of course, if a non-faulty node can be elected a leader, then recovering faulty nodes is easy with the follower recovery. However, the leader election requires a node to have the most up-to-date log to become a leader, limiting a selection of nodes for the job. That being said, the node can be elected with a corrupted log, but it needs to recover the corrupted entries from the followers. If the entry is not available on any of the followers, the state machine becomes stuck (as it should). The protocol only recovers committed log entries and follows Raft logic to discard non-committed log suffix if it has corrupted entries. 

In addition, to log recovery, the paper also talks about snapshot recovery. The idea behind snapshot recovery is to make sure all nodes take the same snapshots at the same index in the log, break them into “chunks” and recover chunks as needed from other nodes. 

Here is the presentation by Rohan Puri:

Discussion

1) The need for logs? The paper assumes that a state machine takes periodic snapshots to a disk/drive, and such snapshot in combination with a log can be used for node recovery later. This implies that the actual current state of the state machine can be lost due to a server restart. However, some state machines are directly backed by the disk, in essence, representing a rolling snapshot that gets updated every time an operation from the log applies. Recovery of such disk-backed state machine can be quicker and require only log entries happening after the crash/restart. Of course, this does not mean that the disk-backed state machine itself cannot be corrupted. In any case, the log entries are required for recovery and can be garbage collected once all nodes have persisted the state machine to disk (either as part of normal operation or a snapshot), making the time-frame for the log entries to remain useful to be relatively small. 

A more interesting problem may arise in trying to recover the corrupted state machine. If we rely on this “rolling-snapshot” disk-backed state machine, the mechanism the paper uses for snapshot recovery won’t work, since different copies of the state machine may be misaligned ever-so-slightly. Of course, one can always do the costly node restore procedure — restore to some prior snapshot and replay the log, but this is wasteful and requires keeping an extra snapshot and log from the snapshot onwards. In the spirit of the paper, we should rely on distributed copies instead and be able to restore the corruption without relying on storing redundant copies on the same server

2) Persistent memory vs RAM and recovery for in-memory SMR. If we build a state machine replication (SMR) to work purely off RAM, then we do not have the luxury of retaining any state after a restart. As such, in-memory state machines must have different mechanisms to ensure safety. For example, in traditional Multi-Paxos with a disk, a node always remembers the current term/ballot and past votes it has participated in. Without durable memory, a node restart erases the previous voting state, allowing a node to vote on something it has already voted on before, but with a lower term/ballot. This is not safe and may lead to a double-commit on the same log entry when a node promises to some new leader, and then after restart makes a second promise in the same log index to some older leader. 

Allowing for corruption in persistent memory is somewhat similar to not having persistent memory at all, at least when dealing with crashes/restarts. The very piece of data/metadata we need to ensure safety and avoid double voting as in the example above may be corrupted and cannot be used after a restart. However, the same precautions used for in-memory replicated state machines will work with corrupted storage as well and allow for safe recovery. For example, to prevent the double-voting example, a recovering node needs to run a “mock” leader election (or a leader election with a term guaranteed to not succeed). Such leader election will ensure the node gets a proper view of the current ballot/term in the cluster to make sure it no longer accepts votes from prior leaders. After such a mock election, the node can start accepting/voting for log entries while recovering any prior log and/or state machine from any of the replicas. Of course, the full recovery completes when enough data is shipped from other nodes (i.e. snapshots + missing log entries). 

There are a few differences between RAM and persistent storage when it comes to recovery. First of all, while it seems like both can lose data (one due to a reboot, another due to some random corruption), persistent storage still has a hint of data being missing. This is like not remembering what the node has voted for or who was the leader, but still having a 6th sense that something was voted upon. This extra piece of information may be useful in recovery, and indeed the protocol from the paper takes advantage of that to improve fault tolerance and safety. The recovery protocol preserves safety when the majority of nodes fail at the same log index, as the protocol knows something is missing entirely and will halt for safety. In the RAM setting, a mass reboot (i.e. majority of nodes) leads to a collective loss of memory without any hint that something may have been agreed upon, leading to a rewrite of the log. 

The second difference is that persistent memory may not lose all the data, so fewer items must be shipped from the followers. 

3) Leader-bound recovery. The paper suggests recovering followers from the leader node. This can put more load on the leader, who is already a bottleneck in the protocol. It seems like it may be possible to recover committed log entries from followers (the paper already does so for leader recovery) to make the recovery procedure less demanding for the leader.

4) Byzantine. The paper touches a bit on this topic. Data corruption on disk can be viewed through the lens of Byzantine fault tolerance. The corruption causes a node to act outside of the protocol specs, and byzantine-tolerant protocols are designed to handle such “out-of-spec” behaviors. The paper is a good example of how we can often solve some specific types of byzantine behaviors without resorting to the full-blown PBFT-style solutions. This is very practical, as we want the state machine to handle data corruptions, but we do not want to pay the performance penalty associated with BFT protocols. 

5) Luckilyhood of data corruption. Another point of discussion was around the likelihood of such data-faults happening. It does not seem like these are too frequent, but they do happen. We touched on a few anecdotal occurrences. For example, some firmware issues causing the disk to not write some large buffers of data. 

It is also worth noting error correction. Error correction is standard for server-grade memory, and it comes at a relatively small monetary/performance cost. Similar error-correction technologies are used in disks and drives, allowing for small errors (i.e. a bit-flip) to be fixed by the drive. In fact, NAND flash SSDs rely on error correction in normal operation.

6) Infallible disk. Protocols assume disk is always correct. Why? Even on the surface, this does not come as a super tight assumption. And especially on the scale of millions of SMR instances deployed across millions of machines.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Paxos vs Raft: Have we reached consensus on distributed consensus?

In our 54th reading group meeting, we were looking for an answer to an important question in the distributed systems community: “What about Raft?” We looked at the “Paxos vs Raft: Have we reached consensus on distributed consensus?” paper to try to find the answer. As always, we had an excellent presentation, this time by A. Jesse Jiryu Davis

The paper compares Multi-Paxos and Raft protocols, but it does so through the lens of Raft and uses Raft terminology to describe Paxos. This can be very handy for people who first learned consensus through Raft. The paper mentions that the two protocols are very similar, especially in the “happy-case” operations. The important differences come in the leader-election phases of the protocols. In Raft, a new leader must be a fully caught-up replica with the longest log, while Multi-Paxos can pick any node as a leader and recover missing log entries. The authors argue that this Raft behavior is good for efficiency — the new leader can start quickly since it does not need to learn any missing entries. 

When it comes to understandability, the paper says that Raft is “slightly more understandable than Paxos.” I suppose this comparison comes after expressing/explaining Paxos in Raft terminology, and not based on the original explanations. 

Personally, Paxos is more understandable to me, since I absolutely hate the quirk of Raft when a majority-accepted but not committed value may get “chopped off” upon some intricate leader churn (See Figure 8 in the original paper). This ties to the sequential commit requirement in the normal case, and different commit rules that apply upon leader churn, which all tie to the leader election procedure itself. How is it more understandable? In contrast, Paxos never loses a majority-accepted value and has one commit rule, although it allows committing out-of-order (the execution of committed operations still must follow the log order). 

I won’t go further into summarizing the paper, it is a good educational read, so I do not want to spoil it.

Discussion.

1) Orignal Paxos Papers as a Source of Confusion? Needless to say, Lamport’s Paxos original paper does not have the clearest description of the algorithm. This somewhat extends to Lamport’s Paxos Made Simple paper, and Paxos Made Moderately Complex by Robbert van Renesse. However, the relative difficulty of understanding Paxos from these later papers may not lie in the algorithm’s description itself, but in the language used. Raft paper is written with an engineering audience in mind, and operates with primitives, like RPCs, that can be used in programming languages right away. Raft is described in operational concepts, such as server states with clear boundaries and transitions between states — first, do leader election, then do log replication, and finally, go back to leader election if needed. These may have a great appeal to students and professionals. We teach computer science from the code first (it is a separate discussion whether this is the right way to approach distributed systems though), and describing protocol closer to code has a certain appeal. 

Multi-Paxos is described in more abstract terms. Multi-Paxos is not as “operational.” For example, the leader election gets blurred a bit with replication as the leader may need to fill some slots before becoming fully active. Transitions back to leader election are also a bit murkier compared to Raft.

I think it is great to have this abstract Paxos algorithm that we can shape and implement in a variety of ways. This leads us to uncover new ways it can be refined and/or implemented — take a look at Flexible Paxos result.

2) Reference Implementation(s). To continue the previous point, a great appeal of Raft are many (reference) implementations. This again appeals greatly to engineers, who can look at the code and run it, trace through breakpoints, and learn it hands-on. Another point that was mention in the discussion is a kind of herd effect. Once a good production-grade implementation is available, more people will just take it and use it. 

3) Leader election efficiency. We spent a bit of time discussing the leader election efficiency point from the paper. This is an important feature that may come in handy in disaster recovery performance. The question is how realistic it is to have followers that significantly lag behind. These lagging followers may put pressure on the leader, as the leader cannot compact the log while some follower is struggling behind and consumes it, which may cause higher memory and/or storage consumption. But of course, this applies only to severely lagging machines, and the performance hit from catch up after the leader election can be noticeable even with a less severe staleness. In the discussion, it was mentioned that having hours of staleness is possible on some systems!

On the other hand, the Cloudflare outage is a good illustration of how Raft’s leader election fails at liveness unless we add a bit more complexity to the protocol (and removing understandability?). So its good performance may get compromised under some conditions. Here is the paper on this too: “Examining Raft’s behaviour during partial network failures.” And no surprise, it is by Heidi Howard.

4) Is Raft an Implementation of Multi-Paxos? Given a more abstract description of Multi-Paxos and a more direct (and less flexible?) approach taken by Raft, one may think that Raft is, in a sense, an implementation of Multi-Paxos. Both protocols operate on Majority quorums, have a stable leader. They have the same communication pattern as the leader talks to followers. Differences appear in the leader election, but are these big enough differences? Can one implement something Raft-like from an abstract Paxos blueprint? 

The differences may appear big, but they also may appear as a refinement of Paxos. 

  • Can I elect a leader without having to copy data from followers? Sure, if you pick the most up-to-date follower to be the leader. 
  • How do I know which one is the most up-to-date? Well, look at the latest committed log item. 
  • Wait a sec, in Paxos I can commit out of order, so which one is the latest committed log item? Just don’t commit out of order… 

5) Porting Paxos Optimizations to Raft. As mentioned in our presentation, another paper tried to answer Paxos vs Raft question. That paper, titled “On the Parallels between Paxos and Raft, and how to PortOptimizations“, looked at a number of Paxos optimizations and tried to figure out if they can apply to Raft. The authors there concluded that for the most part, they apply. And they also noted how the two protocols are similar and can be made more similar by tweaking the implementation of Multi-Paxos to be more Raft-like (which goes back to point (4) above).

6) Industry Use of Raft vs Multi-Paxos. The industry loves Raft. For the reasons I already mentioned above: reference code, good existing libraries, the description that is closer to the code. One notable company that has been using Multi-Paxos is Google. Their Spanner database is based on Multi-Paxos. However, its open-source cousins (CockroachDB and YugabyteDB) rely on Raft. We were wondering how much more difficult it was to implement Multi-Paxos in production compared to Raft in this similar databases.  

Easy adoption is a great point for use in the industry, especially when businesses lack the expertise to use formal methods to check their protocols or protocol variants. However, this may be changing now, as formal methods, like TLA+, are picking up more widespread adoption. We see that companies start to modify their Raft protocols in a variety of ways, and remain confident in their safety and liveness properties through model-checking. This adoption of formal methods may bring Multi-Paxos and Raft closer together, as a more abstract way of thinking about these protocols may highlight their similarities. For example, Jesse mentioned how they use TLA+ to build confidence in their variant of Raft.

7) Other Consensus Protocols. Another important topic we discussed relates to other consensus protocols. There are plenty of consensus-based replicated state machines, yet we still pretty much use Paxos or Raft. Solutions like EPaxos and its extension have been around for some time. Is the problem that they are hard to implement? Are they difficult to understand as well? EPaxos is not an easy protocol for sure, but it has a reference implementation. Or maybe these protocols are not as good in practice as the papers promise…

8) Teachability. If the community has a good understanding and description of Paxos, why not use it in classrooms and textbooks? There are not that many good distributed systems textbooks, and it seems like a lot of faculty prefer to teach from papers and their own notes. The same goes for engineering folks who have to pick up on distributed systems state-of-the-art through an extensive literature survey. 

We have mentioned a few sources to help with distributed systems such as the “Designing Data-Intensive Applications” book by Martin Kleppmann or MIT distributed systems course.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Microsecond Consensus for Microsecond Applications

Our 43rd reading group paper was about an extremely low-latency consensus using RDMA: “Microsecond Consensus for Microsecond Applications.” The motivation is pretty compelling — if you have a fast application, then you need fast replication to make your app reliable without holding it back. How fast are we talking here? Authors go for ~1 microsecond with their consensus system called Mu. That is one-thousandth of a millisecond. Of course, this is not achievable over a regular network and network protocols like TCP, so Mu relies on RDMA.

In my mind, Mu maps rather perfectly to Paxos/MultiPaxos, adjusted for the RDMA usage. Accept phase is pretty much Paxos phase-2. The leader directly writes to the follower’s memory. Mu does not use protocol-specific acks, but there is still an RDMA-level ack for successfully writing memory and thus completion of phase-2. Of course in Paxos, followers must check the ballot before accepting an operation in Phase-2. This requires processing and will negate the benefits of direct memory access. To work around the problem, Mu uses RDMA permissions to control whose memory writes are accepted in phase-2. The bottom line, however, is that we have a single round trip phase-2 capable of rejecting messaging from “wrong” leaders, just like in Paxos.

Paxos elects a leader in phase-1. In Mu, the equivalent of phase-1 consists of 2 sub-phases. First, a prospective leader contacts the quorum of followers and tells them to change the permissions from an old leader to itself. This prevents the old leader from writing to a quorum and makes it stop. This quorum becomes “the leader’s go-to quorum”, as it can only write to the nodes from that quorum due to permissions. In the second sub-phase, the prospective leader learns of the past proposal/ballot number and any past operations to recover. The leader then picks a higher proposal number and writes it back. Just like in Paxos/MultiPaxos, the leader must recover the learned commands.

Another prominent part of the paper is the failure detector. The authors claim that it allows for fast leader failover. The detector operates by a pull mechanism — a leader maintains a heartbeat counter in its memory, and increments it periodically, the followers read the counter and depending on the counter’s progress adjust the “badness” score. If the counter moves too slow or does not move (or not readable at all?), the badness score becomes high, causing the follower to decide that a leader has failed and try to take over.

As always, the paper has way more details than I can cover in a short summary. Our group’s presentation by Mohit Garg is available on YouTube:

Discussion

1) Performance. Microsecond latency covers only replication and does not include any of the client interactions or request capture. These components may add a significant delay to the client-observed latency. Moreover, the throughput figure has latency that is at least somewhat close to 1 microsecond only at the low-end of the throughput curve. Pushing more operations degrades latency quite significantly — up to 15 microseconds. Of course, it is worth noting that this is with batching enabled, so still pretty impressive.

2) Use of RDMA permissions for leader enforcement. This looked familiar to me… Until I was reminded that in the 17th reading group meeting we looked at the “Impact of RDMA on agreement” paper by the same authors.

3) Quorums. Since the protocol relies on the permissions to be explicitly granted to a leader when it contacts a quorum, that leader cannot use any other quorum, as it won’t have permissions to access it. We were not very sure why a leader cannot contact all nodes and try to get permissions to all of them. It still needs only the majority to succeed, but having more than the quorum of nodes who can accept writes from leader may be handy, since trying to write to more nodes than the minimal quorum can be useful for controlling the tail latency and tolerating strugglers.

4) Flexible Quorums. This continues the above point about quorums. Flexible quorums are quite useful in trading off fault tolerance and scalability. Since Mu is restricted to just one quorum that granted the write permissions, it cannot take advantage of flexible quorums, such as grids.

5) Failure detector. Failure detector is one of the most interesting and controversial features in Mu. We have spent quite a bit of time discussing it. First of all, what does the pull model give us? Every follower keeps pegging the leader and reading some counter. But what if the leader is actually totally and utterly down, how can you read the memory of the crashed server to learn its counter and compute the badness score from it? Of course, if a follower cannot read, then it can conclude that the leader is down and start the leader election, but this is not explicitly mentioned in the paper. So what is the purpose of reading a counter and having the counter increase then? Being able to read the counter clearly means the leader is up, at least in some capacity. The counter and badness score computed from it is not so much the proxy of the node’s overall up/down status, but the proxy of the node’s health/performance. The paper briefly alludes to this when talking about replication being stuck, eventually causing the heartbeat counter to stop as well and trigger an election, despite the leader not being completely down.

In the discussion, we came up with a different heartbeat mechanism, that avoids the “read from dead node” issue. If we make the leader write its counter to the followers’ memory, and followers read their local copy of the leader’s counter, then a leader crash will stop the counter progress, and followers can detect it by reading their local memory. Quite honestly, this scheme sounds cleaner to us than the follower pull/read approach used in the paper. The authors claim that the pull mechanism provides better detection latency, but this is not backed up experimentally in the paper.

6) “Dumb” acceptors. Mu is not the only protocol that assumes “dumb” Paxos acceptors/followers that simply provide a write/read interface with very little capacity to run any “logic”. Disk Paxos assumes separate sets of processors and disks. One processor can become a leader, and disks are the followers. Disk Paxos, of course, would not provide the same low latency, as in each phase a processor needs to both write and read remote disks/storage. The paper briefly mentions Disk Paxos. CPaxos is a WAN Paxos variant built using strongly consistent cloud storage services as acceptors. Similarly, the storage service provides limited ability to run any logic and the leader must jump through some hoops to maintain safety. Another one mentioned in the discussion was Zero-copy Paxos.

7) Ordered communication for correctness. We spent a bit of time talking about the importance of ordered communication (FIFO) for the correctness of the protocol. If not for FIFO, there could have been some interesting corner cases around the leader churn. I usually do not fully trust papers that just state the assumptions of the FIFO channels and move on, since traditionally you may have quite a few corner-cases with systems built on FIFO network protocols, like TCP, and have messages reordered. One common reason is that applications often have complex and multi-threaded logic, and may reorder messages internally after the messages have left the TCP stack. Here, however, there is no logic at the followers, and it makes the ordered network all you need (assuming there are no other corner-cases in the network, like dropped connections and re-connections).

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

One Page Summary. Gryff: Unifying Consensus and Shared Registers

This paper by Matthew Burke, Audrey Cheng, and Wyatt Lloyd appeared in NSDI 2020 and explores an interesting idea of a hybrid replication protocol. The premise is very simple – we can take one protocol that solves a part of the problem well, and marry it with another protocol that excels at the second half of the problem. This paper tackles replication in geo-distributed strongly consistent storage systems. The authors argue that consensus, when used in storage systems with predominantly read and write operations, is inefficient and causes high tail latency.

A system presented in the paper, called Gryff, takes advantage of predominantly read/write workloads in storage systems and exposes these two APIs via a multi-writer atomic storage ABD protocol.  ABD operates in two phases both for reads and writes. On writes, ABD’s coordinator retrieves the latest version of the register from all nodes and writes back with a version higher than it has seen. On reads, ABD’s coordinator again retrieves the register, writes the highest version back to the cluster to ensure future reads do not see any previous versions, and only then returns back to the client. The write-back stage, however, can be skipped if a quorum of nodes agrees on the same version/value of the register, allowing for single RTT reads in a happy case.

Unfortunately, ABD, while providing linearizability, is not capable of supporting more sophisticated APIs. Read-modify-write (RMW) is a common pattern in many storage systems to implement transaction-like conditional updates of data. To support RMW, Gryff resorts back to consensus and in particular to Egalitarian Paxos (EPaxos) protocol. Choice of EPaxos allows any node in the cluster to act as the coordinator, so it does not restrict writes to a single node like with many other protocols. The problem of this hybrid approach is then the ordering of operations completed with ABD protocol and RMW operations running under EPaxos. Since EPaxos side of Gryff works only with RMWs, it can only order these operations with respect to other RMW operations, but what we need is a linearizable ordering of RMWs with normal writes and/or reads. To keep the ordering, Gryff uses tuples of ABD’s logical timestamp, process ID and the RMW logical counter, called carstamps. Carstamps connect the ABD part of the system with EPaxos – only ABD can update ABD’s logical clock, and only EPaxos updates RMWs counter.

When we consider the interleaving of writes and RMWs, the write with higher ABD’s logical time supersedes any other write or RMW. This means that we actually do not need to order all RMWs with respect to each other, but only order RMWs that have the same base or ABD’s logical time. EPaxos was modified to allow such partial ordering of commands belonging to different bases, essentially making the protocols to have different dependency graphs for RMWs applied to different ABD states. Another change to EPaxos is the cluster-execute requirement, as the quorum of nodes need to apply the change before it can be returned to the client to make the change visible for subsequent ABD read operations.

gryff_1
So, how does Gryff do with regards to performance? Based on the author’s evaluation, it is doing very well in reducing the (tail) latency of reads. However, I have to point out that the comparison with Multi-Paxos was flawed, at least to some extent. The authors always consider running a full Paxos phase for reads, and do not consider the possibility of reading from a lease-protected leader, eliminating 1 RTT from Paxos read. This will make Paxos minimum latency to be smaller than Gryff’s, while also dramatically reducing the tail latency. Gryff also struggles with write performance, because writes always take 2 RTTs in the ABD algorithm. As far as scalability, authors admit that it cannot push as many requests per second as EPaxos even in its most favorable configuration with just 3 nodes.gryff_2

Can Paxos do better? We believe that our PQR optimization when applied in WAN will cut down most of the reads down to 1 quorum RTT, similar to Gryff. PQR, however, may still occasionally retry the reads if the size of a keyspace is small, however this problem also applies to Gryff when the cluster is larger than 3 nodes.

What about Casandra? Cassandra uses a protocol similar to ABD for its replication, and it also incorporates Paxos to perform compare-and-set transactions, which are one case of RMW operation, so in a sense Gryff appears to be very similar to what Cassandra has been doing for years.

 

PigPaxos: continue devouring communication bottlenecks in distributed consensus.

pigpaxos

This is a short follow-up to Murat’s PigPaxos post. I strongly recommend reading it first as it provides full context for what is to follow. And yes, it also includes the explanation of what pigs have to do with Paxos.

Short Recap of PigPaxos.

In our recent SIGMOD paper we looked at the bottleneck of consensus-based replication protocols. One of the more obvious observations was that in protocols relying on a single “strong” leader, that leader is overwhelmed with managing all the communication. The goal of PigPaxos is to give the leader a bit more breathing room to do the job of leader, and not talking as much. To that order, we replaced the direct communication pattern between leader and followers with a two-hop pattern in which a leader talks to a small subset of randomly picked relay nodes, and the relays in turn communicate with the rest of the cluster. PigPaxos also uses relays to aggregate the replies together before returning to the leader. On each communication step, PigPaxos uses a new set of randomly picked relay nodes to both spread the load evenly among the followers and to tolerate failures.

pigpaxos_communication

By randomly rotating the relays and enforcing timeouts and including some other optimization on how many nodes to wait at each relay node, we can provide adequate performance even in the event of node crashes or network partitions. The fault tolerance limit of PigPaoxs is similar to Paxos, and up to a minority of nodes may fail with the system still making some (limited if implemented naively) progress.

Some More Results

In the original PigPaxos post, we have not talked about scaling to super large clusters. Well, I still do not have that data available, but following the footsteps of our SIGMOD work, we have developed a performance model that, hopefully, is accurate enough to show some expected performance on the bigger scale.

pigpaxos_model
Performance Model of PigPaxos on for a cluster of 25 and 99 nodes and 3 relay groups.

Network uniformity is not a requirement for PigPaxos. In fact it is perfectly ok to have some links slower than the others. However, some arrangement of relay groups may be required to get the best performance when links between nodes have different speed or capacity.  The most pronounced real-world example of this non-uniformity is the wide area networks. When we deployed real, not-simulated PigPaxos in such geo-distributed environment, it no longer had the disadvantage of slower latency, as the latency became dominated by much slower geo-links. We took advantage of natural division between fast and slow links, and made all nodes in every region to be part of the same relay group. Another advantage of this setup is the amount of cross-region traffic flowing, as data moves to each region only once regardless of how many replicas are there.

pigpaxos_wan

On the fault tolerance front, relay nodes definitely introduce more ways for the protocol to stumble. Crash of a relay node makes the entire relay group unavailable for that communication attempt. Crash of a non-relay node causes timeout which may add to the operation latency. The core principle behind PigPaxos’ fault tolerance is to repeat failed communication in the new configuration of relay nodes. Eventually, the configuration will be favorable enough to make progress, given that the majority of nodes are up. However, this process can be slow when many nodes are crashed, so some orthogonal optimization can help. For example, it is worth remembering nodes temporarily down and not use these nodes for relays or otherwise expect them to reply on time. Another approach is to reduce the wait quorum of the relay group to tolerate strugglers, or even use overlapping groups for communication redundancy. However, even with all these ad-hoc optimizations turned off, PigPaxos can still mask failures originating in the minority of relay groups without much impact on performance. For example, in the experiment below we have one relay group experiencing a failure on every operation for 10 seconds without much detriment to overall performance.

pigpaxos_crash

Why Scaling to This Many Nodes?

One of the most important questions about PigPaxos is “why?” Why do you need this many nodes in Paxos? Well, the answer is not simple and consists of multiple parts:

  •         Because we can!
  •         Because now we can tolerate more nodes crashing
  •         Because now we can make services like ZooKeeper or even databases to scale for reads just by adding more nodes. ZooKeeper reads are from a single node. And so are many databases that provide some relaxed consistency guarantees.
  •         Because it allows bigger apps with more parties that require consensus. And it is done by a single protocol.

 

 

One Page Summary: Ring Paxos

This paper (Ring Paxos: A high-throughput atomic broadcast protocol) has been out for quite some time, but it addresses a problem still relevant in many distributed consensus protocols. Ring Paxos aims to reduce the communication load in the Paxos cluster and provide better scalability. As we have shown in our SIGMOD 2019 paper, communication is a great limiting factor in scalability of Paxos-like protocols.

Ring Paxos reduces communication overheads with a twofold approach. First, it uses ip-multicast to substitute direct node-to-node communication wherever possible with a broadcast type of communication. Second, Ring Paxos overlays a ring topology over (parts) of the Paxos cluster to control the message flow and prevent communication bottlenecks from forming.

Ring Paxos operates very similarly to regular Paxos, with differences being mainly in the communication part of the protocol. One node acts as a designated coordinator that receives proposals from the clients. However, the coordinator must confirm itself as a valid coordinator using the phase-1 of Paxos. To that order, the coordinator uses ip-multicast to send the message with some ballot/round number to all acceptors. This message also contains the ring configuration, including the node designated as the beginning of the ring. The acceptors receive the message and compare the ballot with their knowledge and only accept the node as new coordinator if the received ballot is the highest an acceptor has seen so far. Each acceptor is going to reply independently to the coordinator (not shown in the figure), and with these replies the coordinator will learn whether it has succeeded. Additionally, the coordinator also learns of any unfinished/uncommitted values that must be recovered.

Ring Paxos Protocol. Phase-2
Ring Paxos Protocol. Phase-2

Upon successfully getting a quorum of confirmations, the coordinator moves on to the phase-2 of Paxos, illustrated on the figure on the right. During this phase, the coordinator replicates the commands/log entries to the acceptors. Similarly to the phase-1, the coordinator uses ip-multicast to send this message to the acceptors (message #2 in the figure). The acceptors (and learners) get the value/command to be replicated, but that value is not committed just yet. Along with the value, acceptors also receive the value-id (c-vid), a unique identifier for the value, and they set their working value-id (v-vid) to c-vid. The acceptors do not reply individually to the coordinator. Instead, the first coordinator in the ring sends a reply, containing the c-vid to its successor (message #3 in the figure). The acceptor receiving such reply will compare its v-vid and the c-vid from the reply message. The two value-ids match when the acceptor is not aware of any other value/coordinator is acting concurrently. In this case it forwards the message further down the chain. This chain forwarding in the ring happens until the reply reaches the coordinator (message #4), which sits at the tail of the chain. The protocol terminates the message propagation across the ring when the acceptor’s v-vid and c-vid from the reply message are different, leaving the coordinator to wait for a timeout and retry the protocol from the beginning with a higher ballot. Node failures produce similar outcome, since a failure completely hinders message propagation across the ring. As a result, the new phase-1 of Paxos must include a different ring configuration to try avoiding the failed node. When the coordinator receives the reply from the ring/chain, it sends the commit message to all acceptors and learners with the ip-multicast (message #5). The protocol may be extended for running multiple slots in parallel by ensuring the c-vid and v-vid comparisons happen within the same slot.

Ring Paxos Performance.
Ring Paxos Performance.

The performance of Ring Paxos is better that standard Paxos implementations, such as Libpaxos (uses ip-multicast), and Paxos4sb (unicast). It appears that Ring Paxos is capable of saturating most of the network bandwidth, with only LCR protocol pushing a bit more throughput.

Paper Summary: Bolt-On Global Consistency for the Cloud

lat1
Figure 1. Latency improvement from going multi-cloud. Minimum read latency SLO feasible when using a single provider for data storage versus when using multiple providers.

This paper appeared in SOCC 2018, but caught my Paxos attention only recently. The premise of the paper is to provide strong consistency in a heterogeneous storage system spanning multiple cloud providers and storage platforms. Going across cloud providers is challenging, since storage services at different clouds cannot directly talk to each other and replicate the data with strong consistency. The benefits of spanning multiple clouds, however, may worth the hustle, since a heterogeneous system will be both better protected from cloud provider outages, and provide better performance by placing the data closer to the users. The latter aspect is emphasized in the paper, and as seen in the figure, going multi-cloud can reduce latency by up to ~25%.

Figure 2. Comparison of (a) classic Paxos and (b) CPaxos.
Figure 2. Comparison of (a) classic Paxos and (b) CPaxos.

To solve the issue of consistent cross-cloud replication, authors propose to use Cloud Paxos (CPaxos), a Paxos variant designed to work with followers supporting a very minimal and common set of operations: get and conditional put. In CPaxos, clients act as prospers, and storage systems serve the role of the followers. The followers are not really “smart” in this protocol, and most of the Paxos logic shifts to the client-proposers (Figure 2).

The prepare phase in CPaxos simply gathers the state from the followers, making the proposer decide for itself whether the followers would have accepted it with the current ballot or not. If the proposer thinks it would have been accepted, it will try updating the followers’ state. Doing this, however, requires some precautions from the followers, since their state may have changed after the proposer made a decision to proceed. For that matter, CPaxos uses conditional put (or compare-and-set) operation, making the followers update their state only if it has not changed since it was read by the proposer. This ensures that at most one proposer can succeed with changing the state of the majority of followers.

Figure X. Object log. Each slot corresponds to some version, and can be committed at some ballot #.
Figure 3. Object log. Each slot corresponds to some version, and can be committed at some ballot #.

I visualize this as a log to represent changes in some object’s state. The new version of an object corresponds to a new slot in the log, while each slot can be tried with different ballot by different proposers. The put operation succeeds at the follower only if the value at the slot and a ballot has not been written by some other proposer. In case a proposer does not get a majority of successful updates, it needs to start from the beginning: increase its ballot, perform a read and make a decision whether to proceed with state update. Upon reaching the majority acks on state update, the proposer sends a message to flip the commit bit to make sure each follower knows the global state of the operation.

This basic protocol has quite a few problems with performance. Latency is large, since at least 2 round-trips are required to reach consensus, since every proposer needs to run 2 phases (+ send a commit message). Additionally, increasing the number of proposers acting on the same objects will lead to the growth in conflict, requiring repeated restarts and further increasing latency. CPaxos mitigates these problems to a degree. For example, it tries to commit values on the fast path by avoiding the prepare phase entirely and starting an accept phase on what it believes will be next version of an object with ballot #0. If the proposer’s knowledge of the object’s state (version, ballot) is outdated, the conditional put will fail and the proposer will try again, but this time with full two phases to learn the correct state first. However, if the proposer is lucky, an update can go in just one round-trip. This optimization, of course, works only when an object is rarely updated concurrently by multiple proposers; otherwise dueling leaders become a problem not only for progress, but for safety as well, since two proposers may write different values for the same version using the same ballot. This creates a bit of a conundrum on when the value becomes safely anchored and won’t ever get lost.

Figure 3. Dueling leaders both start opportunistic accept with ballot #0, each writing their own values (green and blue)
Figure 4. Dueling leaders both start opportunistic accept with ballot #0, each writing their own values (green and blue)

Figure 4. Which value to recover in case of the failure? Both green and blue are on the same ballot and have the same number of live nodes
Figure 5. Which value to recover in case of the failure? Both green and blue are on the same ballot and have the same number of live nodes

Consider an example in which two proposers write different values: green and blue to the same version using ballot #0 (Figure 4 on the left). One of the proposers is able to write to the majority, before it becomes unresponsive. At the same time, one green follower crashes as well, leading to a situation with two followers having green value and two being blue (Figure 5 on the right). The remaining proposer has no knowledge of whether the green or blue value needs to be recovered (remember, they are both on the same ballot in the same slot/version!). To avoid this situation, CPaxos expands the fast path commit quorum from majority to a supermajority, namely \(\left \lceil{\frac{3f}{2}}\right \rceil  +1\) followers, where \(2f+1\) is the total number of followers, and f is the tolerated number of follower failures, allowing the anchored/committed value to be in a majority of any majority of followers exploding-head. Having this creates an interesting misbalance in fault tolerance: while CPaxos still tolerates \(f[\latex] node failures and can make progress by degrading to full 2 phases of the protocol, it can lose an uncommitted value even if it was accepted by the majority when up to [latex]f\) followers fail.

Figure 5. (a) messages are sent at the same time; delivery time is different increasing the duration of inconsistency/conflicting state. (b) some messages are sent with delay to provide similar arrival time and reduce the duration of inconsistent state
Figure 6. (a) messages are sent at the same time; delivery time is different increasing the duration of inconsistency/conflicting state. (b) some messages are sent with delay to provide similar arrival time and reduce the duration of inconsistent state

Proposer conflicts are a big problem for CPaxos, so naturally the protocol tries to mitigate it. The approach taken here reduces the duration in which possible conflicts may occur. As CPaxos is deployed over many datacenters, the latencies between datacenters are not likely to be uniform. This means, that a prepare or accept messages from some proposer reach different datacenters at different times, creating an inconsistent state. When two proposers operate concurrently, they are more likely so observe this inconsistency: as both proposers quickly update their neighboring datacenters, they run the risk of not reaching the required supermajority due to the conflicting state (Figure 6(a)) created by some messages being not as quick to reach remaining datacenters. To avoid rejecting both proposers, CPaxos schedules sending messages in a way to deliver them to all datacenters at roughly the same time. This reduces the duration of inconsistent state, allowing to order some concurrent operations (Figure 6(b)).

Figure 6. Degradation in write throughput under different conflict rates.
Figure 7. Degradation in write throughput under different conflict rates.

Despite the above mitigation strategy, conflicts still affect CPaxos greatly. The authors are rather open about this, and show their system CRIC with CPaxos degrading quicker than Paxos and Fast Paxos as the conflict rate increases. However, in the low conflict scenario, which authors argue is more likely in real world applications, CRIC and CPaxos improve on performance compared to Paxos/Fast Paxos, especially for reading the data. This is because reads in CPaxos are carried out in one round-trip-time (RTT) by client-proposer contacting all followers and waiting for at least a majority of them to reply. If the client sees the latest version with a commit flag set in the majority, it can return the data. Otherwise, it will wait to hear from more followers and use their logs to determine the safe value to return. In some rare cases when the proposer cannot determine the latest safe value, it will perform the recovery by running the write path of CPaxos with the value to recover (highest ballot value or highest frequency value if more than one value share the ballot).

Figure 7. Read and write latency.
Figure 8. Read and write latency.

Some Thoughts

  • The motivation of the paper was to make strongly consistent system spanning multiple clouds providers and storage systems for the benefit of improved latency though leveraging the location of datacenters of these different providers. However, CRIC and CPaxos protocol requires a lot of communication, even on the read path. During reads, a client-proposer contacts all CPaxos nodes, located at all datacenters, and in best case still needs the majority replies. As such the latency benefit here comes from trying to get not just one node closer to the client, but a majority of nodes. This may be difficult to achieve in large systems spanning many datacenters. I think sharding the system and placing it on subset of nodes based on access locality can benefit here greatly. For instance, Facebook’s Akkio paper claims to have significant reduction in traffic and storage by having fewer replicas and making data follow access patterns. In our recent paper, we have also illustrated a few very simple data migration policies and possible latency improvement from implementing these policies.
  • One RTT reads in “happy path” can be implemented on top of regular MutliPaxos without contacting all nodes in the systems. Reading from the majority of followers is good enough for this most of the time, while in rare circumstances the reader may need to retry the read from any one node. More on this will be in our upcoming HotStorage ’19 paper.
  • The optimization to delay message sending in order to deliver messages at roughly the same time to all nodes can help with conflict reduction in other protocols that suffer from this problem. EPaxos comes to mind right away, as it is affected by the “dueling leaders” problem as well. Actually, CPaxos and EPaxos are rather similar. Both assume low conflict rate to have single round trip “happy path” writes and reads. When the assumption breaks, and there is a conflict, both switch to two phases. EPaxos is better here in a sense that the first opportunistic phase is not totally wasted and can be used as phase-1 in the two phase mode, whereas CPaxos has to start all the way from the beginning due to the API limitation on the follower side.

One Page Summary: “PaxosStore: High-availability Storage Made Practical in WeChat”

PaxosStore paper, published in VLDB 2017, describes the large scale, multi-datacenter storage system used in WeChat. As the name may suggest, it uses Paxos to provide storage consistency. The system claims to provide storage for many components of the WeChat application, with 1.5TB of traffic per day and tens of thousands of queries per second during the peak hours.

ps_archPaxosStore relies on Paxos protocol to for consistency and replication within tight geographical regions. The system was designed with a great separation of concerns in mind. At a high level, it has three distinct layers interacting with each other: API layer, consensus layer, and storage.  Separating these out allowed PaxosStore provide most suitable APIs and storage for different tasks and application, while still having the same Paxos-backed consistency and replication.

ps_paxoslogIn a paxos-driven consensus layer,  the system uses a per-object log to keep track of values and paxos-related metadata, such as promise (epoch) and proposal (slot) numbers. Log’s implementation, however, seems to be somewhat decoupled from the core Paxos protocol. Paxos implementation is leaderless, meaning there are no single dedicated leader for each object, and every node can perform writes on any of the objects in the cluster by running prepare and accept phases of Paxos. Naturally, the system tries to perform (most) writes in one round trip instead of two by assuming some write locality. After the first successful write, a node can issue more writes with increasing proposal (slot) numbers. If some other node performs a write, it needs to have higher ballot, preventing the old master from doing quick writes. This is a rather common approach, used in many Paxos variants.

The lack of a single stable leader complicates the reads, since there is no authority that has the most up-to-date state of each object. One solution for reading is to use Paxos protocol directly, however this disrupts locality of write operations by hijacking the ballot to perform a read. Instead, PaxosStore resorts to reading directly from replicas by finding the most recent version of the data and making sure a majority of replicas have that version. This works well for read-heavy workloads, but in some high-contention (or failure?) cases the most-recent version may not have a majority of replicas, and the system falls-back to running Paxos for reading.

ps_miniclusterPaxosStore runs in multiple datacenters, but it is not a full-fledged geo-replicated system, as it only replicates between the datacenters located in the same geographical area. The paper is not clear on how data get assigned to regions and whether objects can migrate between regions in any way. Within each datacenter the system organizes nodes into mini-clusters, with each mini-cluster acting as a Paxos follower. When data is replicated between mini-clusters, only one (some?) nodes in each mini-cluster hold the data. This design aims to improve fault tolerance: with a 2-node mini-cluster, failure of 1 node does not result in the failure of the entire mini-cluster Paxos-follower.

ps_latThe paper somewhat lacks in its evaluation, but PaxosStore seems to handle its goal of multi-datacenter, same-region replication fairly well, achieving sub-10 ms writes.

This paper seems like a good solution for reliable and somewhat localized data-store. The authors do not address data sharding and migration between regions and focus only on the intra-region replication to multiple datacenters, which makes me thing PaxosStore is not really “global”, geo-replicated database.  The fault tolerance is backed by Paxos, mini-clusters and the usage of PaxosLog for data recovery. The evaluation could have been more complete if authors showed scalability limits of their system and provided some details about throughput and datacenter-locality of the workload in the latency experiments.

Here is one page pdf of this summary.

Modeling Paxos Performance in Wide Area – Part 3

Earlier I looked at modeling paxos performance in local networks, however nowadays people (companies) use paxos and its flavors in the wide area as well. Take Google Spanner and CockroachDB as an example. I was naturally curious to expand my performance model into wide area networks as well. Since our lab worked on WAN coordination for quite some time, I knew what to expect from it, but nevertheless I got a few small surprises along the way.

In this post I will look at Paxos over WAN, EPaxos and our wPaxos protocols. I am going to skip most of the explanation of how I arrived to the models, since the models I used are very similar in spirit to the one I created for looking at local area performance. They all rely on queuing theory approximations for processing overheads and k-order statistics for impact of quorum size.

Despite being similar in methods used, modeling protocols designed for WAN operation proved to be more difficult than local area models. This difficulty arises mainly from the myriad of additional parameters I need to account for. For instance, for Paxos in WAN I need to look at latencies between each node in the cluster, since the WAN-networks are not really uniform in inter-region latencies. Going up to EPaxos, I have multiple leaders to model, which means I also must take into consideration the processing overheads each node takes in its role of following other nodes for some slots. wPaxos takes this even further: to model its performance I need to consider access locality and “object stealing” among other things.

Today I will focus only on 5 region models. In particular, I obtained average latencies between 5 AWS regions: Japan (JP), California (CA), Oregon (OR), Virginia (VA) and Ireland (IR). I show these regions and the latencies between them in Figure 1 below.

Some AWS regions and latencies in ms between them.
Figure 1: Some AWS regions and latencies in ms between them.

Paxos in WAN

Converting paxos model from LAN to WAN is rather straightforward; all I need to do is to modify my paxos model to take non-uniform distances between nodes. I also need the ability to set which node is going to be the leader for my multi-paxos rounds. With these small changes, I can play around with paxos and see how WAN affects it.

Paxos in WAN with leader in different regions.
Figure 2: Paxos in WAN with leader in different regions.

Figure 2 (above) shows a model run for 5 nodes in 5 regions (1 node per region). From my previous post, I knew that the maximum throughput of the system does not depend on network latency, so it is reasonable for paxos in WAN to be similar to paxos in local networks in this regard. However, I was a bit surprised to see how flat the latency stays in WAN deployment almost all the way till reaching the saturation point. This makes perfect sense, however, since the WAN RTT dominates the latency and small latency increases due to the queuing costs are largely masked by large network latency. This also may explain why Spanner, CockroachDB and others use paxos in databases; having predictable performance throughout the entire range of load conditions makes it desirable for delivering stable performance to clients and easier for load-balancing efforts.

However, not everything is so peachy here. Geographical placement of the leader node plays a crucial role in determining the latency of the paxos cluster. If the leader node is too far from the majority quorum nodes, it will incur high latency penalty. We see this with Japan and Ireland regions, as they appear far from all other nodes in the system and result in very high operation latency.

EPaxos

EPaxos protocol tries to address a few shortcoming in paxos. In particular, EPaxos no longer has a single leader node and any node can lead some commands. If commands are independent, then EPaxos can commit them quickly in one phase using a fast quorum. However, if the command have dependencies, EPaxos needs to run another phase on a majority quorum (at which point it pretty much becomes Paxos with two phases for leader election and operation commit). The fast quorum in some cases may be larger than the majority quorum, but in the 5-node model I describe today, the fast quorum is the same as the majority quorum (3 nodes).

Naturally, conflict between commands will impact the performance greatly: with no conflict, all operations can be decided in one phase, while with 100% conflict, all operations need two phases. Since running two phases requires more messages, I had to change the model to factor in the probability of running two phases. Additionally, the model now looks at the performance of every node separately, and account for the node leading some slots and following on the other.

EPaxos throughput at every node with 2% command conflict.
Figure 3: EPaxos throughput at every node with 2% command conflict.

EPaxos throughput at every node with 50% command conflict.
Figure 4: EPaxos throughput at every node with 50% command conflict.

Figures 3 and 4 (above) show EPaxos performance at every node for 2% and 50% conflict. Note that the aggregate throughput of the cluster is a sum of all 5 nodes. For 2% conflict, the max throughput was 2.7 times larger than that of Paxos. As the conflict between commands increases, EPaxos loses its capacity and its maximum throughput decreases, as I illustrate in Figure 5 (below). This changing capacity may make more difficult to use EPaxos in production environments. After all, workload characteristics may fluctuate throughout the system’s lifespan and EPaxos cluster may or may not withstand the workloads of identical intensity (same number of requests/sec), but different conflict.

EPaxos aggregate throughput for different conflict levels.
Figure 5: EPaxos aggregate maximum throughput for different conflict levels.

wPaxos

wPaxos is our recent flavor of WAN-optimized paxos. Its main premise is to separate the commands for different entities (objects) to different leaders and process these commands geographically close to where the entities are required by clients. Unlike most Paxos flavors, wPaxos needs large cluster, however, thanks to flexoble quorums, each operations only uses a subset of nodes in the cluster. This allows us to achieve both multi-leader capability and low average latency.

wPaxos, however, has lots of configurable parameters that all affect the performance. For instance, the fault tolerance may be reduced to the point where a system does not tolerate a region failure, but can still tolerate failure of nodes within the region. In this scenario (Figure 6, below), wPaxos can achieve the best performance with aggregate throughput across all regions (and 3 nodes per region for a total of 15 nodes) of 153,000 requests per second.

wPaxos with no region fault tolerance and 70% region-local operations and 1% chance of object migration.
Figure 6: wPaxos with no region fault tolerance and 70% region-local operations and 1% chance of object migration.

We still observe big differences in latencies due to the geography, as some requests originating in one regions must go through stealing phase or be resolved in another region. However, the average latency for a request is smaller than that of EPaxos or Paxos. Of course, a direct comparison between wPaxos and EPaxos is difficult, as wPaxos (at least in this model configuration) is not as fault tolerant as EPaxos. Also unlike my FPaxos model from last time, wPaxos model also reduces the communication in phase-2 to a phase-2 quorum only. This allows it to take much bigger advantage of flexible quorums than “talk-to-all-nodes” approach. As a result, having more nodes helps wPaxos provide higher throughput than EPaxos.
Some EPaxos problems still show-up in wPaxos. For instance, as the access locality decreases and rate of object migration grows, the maximum throughput a cluster can provide decreases. For instance, Figure 7 (below) shows wPaxos model with locality shrunken to 50% and object migration expanded to 3% of all requests.

wPaxos with no region fault tolerance and 50% region-local operations and 3% chance of object migration.
Figure 7: wPaxos with no region fault tolerance and 50% region-local operations and 3% chance of object migration.

How Good Are the Models?

I was striving to achieve the best model accuracy without going overboard with trying to account all possible variables in the model. The models both for LAN and WAN seems to agree fairly well with the results we observe in our Paxi framework for studying various flavors of consensus.

However, there is always room for improvement, as more parameters can be accounted for to make more accurate models. For instance, WAN RTTs do not really follow a single normal distribution, as a packet can take one of many routes from one region to another (Figure 8, below). This may make real performance fluctuate and “jitter” more compared to a rather idealistic model.

WAN Latency between VA and OR.
Figure 8: WAN Latency between VA and OR.

I did not account for some processing overheads as well. In EPaxos, a node must figure out the dependency graph for each request, and for high-conflict workloads these graphs may get large requiring more processing power. My model is simple and assumes this overhead to be negligible.

Few Concluding Remarks

Over the series of paxos performance modeling posts I looked at various algorithms and parameters that affect their performance. I think it truly helped me understand Paxos a bit better than before doing this work. I showed that network fluctuations have little impact on paxos performance (k-order statistics helps figure this one out). I showed how node’s processing capacity limits the performance (I know this is trivial and obvious), but what is obvious, but still a bit interesting about this is that a paxos node processes roughly half of the messages that do not make a difference anymore. Once the majority quorum is reached, all other messages for a round carry a dead processing weight on the system.

The stability of Paxos compared to other more complicated flavors of paxos (EPaxos, wPaxos) also seems interesting and probably explains why production-grade systems use paxos a lot. Despite having lesser capacity, paxos is very stable, as its latency changes little at levels of throughput. Additionally, The maximum throughput of paxos is not affected by the workload characteristics, such as conflict or locality. This predictability is important for production systems that must plan and allocate resources. It is simply easier to plan for a system delivering stable performance regardless of the workload characteristics.

Geography plays a big role in WAN paxos performance. Despite the cluster having the same maximum throughput, the clients will observe the performance very differently depending on the leader region. Same goes with EPaxos and wPaxos, as different regions have different costs associated with communicating to the quorums, meaning that clients in one region may observe very different latency than their peers in some other regions. I think this may make it more difficult to provide same strong guarantees (SLAs?) regarding the latency of operations to all clients in production systems.

There are still many things one can study with the models, but I will let it be for now. Anyone who is interested in playing around may get the models on GitHub.

 

Modeling Paxos Performance – Part 2

In the previous posts I started to explore node-scalability of paxos-style protocols. In this post I will look at processing overheads that I estimate with the help of a queue or a processing pipeline. I show how these overheads cap the performance and affect the latency at different cluster loads.

I look at the scalability for a few reasons. For one, in the age of a cloud 3 or 5 nodes cluster may not be enough to provide good resilience, especially in environments with limited control over the node placement. After all, a good cluster needs to avoid nodes that share common points of failures, such as switches of power supply. Second, I think it helps me learn more about paxos and its flavors and why certain applications chose to use it.  And third, I want to look at more exotic paxos variants and how their performance may be impacted by different factors, such as WAN or flexible quorums. For instance, flexible quorums present the opportunity to make trade-offs between performance and resilience. We do this by adjusting the sizes of quorums for phase-1 and phase-2. This is where the modeling becomes handy, as we can check if a particular quorum or deployment makes a difference from the performance standpoint.

Last time, I looked at how local network variations affect the performance when scaling the cluster up in the number of servers. What I realized is that the fluctuations in message round-trip-time (RTT) can only explain roughly 3% performance degradation going from 3 nodes to 5, compared to 30-35% degradation in our implementation of paxos. We also see that this degradation depends on the quorum size, and for some majority quorum deployments there may even be no difference due to the network. In this post I improve the model further to account for processing bottlenecks.

As a refresher from the previous time, I list some of the parameters and variables I have been using:

  • \(l\) – some local message in a round
  • \(r_l\) – message RTT in local area network
  • \(\mu_l\) – average message RTT in local area network
  • \(\sigma_l\) – standard deviation of message RTT in local area network
  • \(N\) – number of nodes participating in a paxos phase
  • \(q\) – Quorum size. For a majority quorum \(q=\left \lfloor{\frac{N}{2}}\right \rfloor  +1\)
  • \(m_s\) – time to serialize a message
  • \(m_d\) – time to deserialize and process a message
  • \(\mu_{ms}\) – average serialization time for a single message
  • \(\mu_{md}\) – average message deserialization time
  • \(\sigma_{ms}\) – standard deviation of message serialization time
  • \(\sigma_{md}\) – standard deviation of message deserialization time

The round latency \(L_r\) of was estimated by \(L_r = m_s + r_{lq-1} + m_d\), where \(r_{lq-1}\) is the RTT + replica processing time for the \(q-1\)th fastest messages \(l_{q-1}\)

Message Processing Queue

Most performance difference in the above model comes from the network performance fluctuations, given that \(m_s\), \(m_d\) and their variances are small compared to network latency. However, handling each message creates significant overheads at the nodes that I did not account for earlier. I visualize the message processing as a queue or a pipeline; if enough compute resources are available, then the message can process immediately, otherwise it has to wait until earlier messages are through and the resources become available. I say that the pipeline is clogged when the messages cannot start processing instantaneously.

The round leader is more prone to clogging, since it needs to process \(N-1\) replies coming roughly at the same time for each round. For the model purposes, I consider queuing/pipeline costs only at the leader. The pipeline is shared for incoming and outgoing messages.

Lets consider a common FIFO pipeline handling messages from all concurrent rounds and clients. When a message \(l_i\) enters the pipeline at some time \(t_{ei}\), it can either process immediately if the pipeline is empty or experience some delay while waiting for the its turn to process.

In the case of empty pipeline, the message exits the queue at time \(t_{fi} = t_{ei} + o\), where \(o\) is message processing overhead \(m_s\) or \(m_d\) depending on whether the message is outgoing or incoming. However, if there is a message in the queue already, then the processing of \(l_i\) will stall or clog for some queue waiting time \(w_i\), thus it will exit the pipeline at time \(t_{fi} = t_{ei} + w_i + o\). To compute \(w_i\) we need to know when message \(l_{i-1}\) is going to leave the queue: \(w_i = t_{fi-1} – t_{ei}\). In its turn, the exit time \(t_{fi-1}\) depends of \(w_{i-1}\), and so we need to compute it first. We can continue to “unroll” the pipeline until we have a message \(l_n\) without any queue waiting time (\(w_{i-n} = 0\)). We can compute the dequeue time for that message \(l_n\), which in turns allows us to compute exit time of all following messages. Figure 1 shows different ways a pipeline can get clogged, along with the effects of clog accumulating over time.

States of a pipeline
Figure 1. States of a pipeline

Unlike earlier, today I also want to model the overheads of communicating with the clients, since in practice we tend to measure the performance as observed by the clients. This requires the round model to account for client communication latency \(r_c\) which is one network RTT. Each round also adds a single message deserialization (client’s request) and a message serialization (reply to a client) to the queue.

Let me summarize the parameters and variables we need to model the queuing costs:

  • \(r_c\) – RTT time to communicate with the client
  • \(n_p\) – the number of parallel queues/pipelines. You can roughly think of this as number of cores you wish to give the node.
  • \(s_p\) – pipeline’s service rate (messages per unit of time). \(s_p = \frac{N+2}{N\mu_{md} + 2 \mu_s}\)
  • \(w_i\) – pipeline waiting time for message \(l_i\)
  • \(R\) – throughput in rounds per unit of time.
  • \(\mu_{r}\) – mean delay between rounds. \(\mu_{r} = \frac{1}{R}\)
  • \(\sigma_{r}\) – standard deviation of inter-round delay.

Now lets talk about some these parameters a bit more and how they relate to the model.

Pipeline service rate \(s_p\) tells how fast a pipeline can process messages. We can get this metric by looking at average latencies of message serialization \(\mu_{ms}\) and deserialization/processing \(\mu_{md}\). With \(N\) nodes in the cluster, we can find an average message overhead of the round \(\mu_{msg}\). For a given round, the leader node needs to handle 2 message serializations (one to start the round and one to reply back to client and \(N\) deserializations (\(N-1\) from followers and one from the client). This communication pattern gives us \(\mu_{msg} = \frac{N\mu_{md}+2\mu_{ms}}{N+2}\). A reciprocal of \(\mu_{msg}\) gives us how many messages can be handled by the pipeline per some unit of time: \(s_p = \frac{N+2}{N\mu_{md} + 2\mu_s}\).

Variable \(w_i\) tells how backed up the pipeline is at the time of message \(l_i\). For instance, \(w_i = 0.002 s\) means that a message \(l_i\) can start processing only after 0.002 seconds delay. Figure 2 illustrates the round execution model with queue wait overheads.

Paxos round and its overheads.
Figure 2. Paxos round and its overheads.

To properly simulate multi-paxos, I need to look at multiple rounds. Variable \(R\) defines the throughput I try to push through the cluster, as higher throughput is likely to lead to longer queue wait times. I also need to take into consideration how rounds are distributed in time. On one side of the spectrum, we can perform bursty rounds, where all \(R\) rounds start at roughly the same time. This will give us the worst round latency, as the pipelines will likely clog more. On the other side, the rounds can be evenly dispersed in time, greatly reducing the competition for pipeline between messages of different rounds. This approach will lead to the best round latency. I have illustrated both of these extremes in round distribution in Figure 3.

Spacing between rounds.
Figure 3. Spacing between rounds.

However, the maximum throughput \(R_{max}\) is the same no matter how rounds are spread out, and it is governed only by when the the node reaches the pipeline saturation point: \(R_{max}(N+2) = n_ps_p\) or \(R_{max}(N+2) = \frac{n_p(N+2)}{N\mu_{md} + 2\mu_{ms}}\). As such, \(R_{max} = \frac{n_p}{N\mu_{md} + 2\mu_{ms}}\). In the actual model simulation, the latency is likely to spike up a bit before this theoretical max throughput point, as pipeline gets very congested and keeps delaying messages more and more.

The likely round distribution is probably something more random as different clients interact with the protocol independently of each other, making such perfect round synchronization impossible. For the simulation, I am taking the uniform separation approach and add some variability to it by drawing the round separation times from a normal distribution \(\mathcal{N}(\mu_r, \sigma_r^2)\). This solution may not be perfect, but normal distribution tend to do fine in modeling many natural random phenomena. I can also control how much different rounds can affect each other by changing the variance \(\sigma_r^2\). When \(\sigma_r\) is close to 0, this becomes similar to uniformly spaced rounds, while large values of \(\sigma_r\) create more “chaos” and conflict between rounds by spreading them more random.

Now I will put all the pieces together. To model the round latency \(L_r\), I modify the old formula to include the queuing costs and client communication delays. Since the round latency is driven by the time it takes to process message \(l_{q-1}\), I only concern myself with the queue waiting time \(c_{q-1}\) for the quorum message. As such, the new formula for round latency is \(L_r = (m_s + r_{lq-1} + c_{q-1} + m_d) + (m_{cd} + m_{cs} + r_c)\). In this formula, \(m_{cd}\) is deserialization overhead for the client request, and \(m_{cs}\) is the serialization overhead for server’s reply back to client.

Simulation Results

As before, I have a python script that puts the pieces together and simulates multi-paxos runs. There are quite a few parameters to consider in the model, so I will show just a few, but you can grab the code and tinker with it to see how it will behave with different settings. Figure 4 shows the simulation with my default parameters: network settings taken from AWS measurements, pipeline performance taken from the early paxi implementation (now it is much faster). Only one pipeline/queue is used. The distribution of rounds in time is controlled by inter-round spacing \(\mu_r = \frac{1}{R}\) with \(\sigma_{r} = 2\mu{r}\).

Latency as a function of throughput for different cluster sizes (Simulation)
Figure 4. Latency as a function of throughput for different cluster sizes (Simulation)

Next figure (Figure 5) shows how latency changes for inter-round delay variances. The runs with higher standard deviation \(\sigma_r\) appear more “curvy”, while the runs with more uniform delay do not seem to degrade as quick until almost reaching the saturation point. High \(\sigma_r\) runs represent more random, uncoordinated interaction with the cluster, which on my opinion is a better representation of what happens in the real world.

Latency as a function of throughput in 5 node cluster for different round spread parameters.
Figure 5. Latency as a function of throughput in 5 node cluster for different inter-round delay parameters.

Do I Need to Simulate Paxos Rounds?

The results above simulate many individual rounds by filling the pipeline with messages and computing the queue wait time for each round. Averaging the latencies across all simulated rounds produces the average latency for some given throughput. However, if I can compute the average queue waiting time and the average latency for the quorum message, then I no longer need to simulate individual rounds to essentially obtain these parameters. This will allow me to find the average round latency much quicker without having to repeat round formula computations over and over again.

Let’s start with computing average latency for a quorum message \(r_{lq-1}\). Since that \(l_{q-1}\) represents the last message needed to make up the quorum, I can model this message’s latency with some \(k\)th-order statistics sampled from Normal distribution \(\mathcal{N}(\mu_l+\mu_{ms}+\mu_{md}, \sigma_l^2 + \sigma_{ms}^2 + \sigma_{md}^2)\) on a sample of size \(N-1\), where \(k=q-1\). To make things simple, I use Monte Carlo method to approximate this number \(r_{lq-1}\) fairly quickly and accurately.

Now to approximating the queue wait time \(w_{q-1}\). This is a bit more involved, but luckily queuing theory provides some easy ways to compute/estimate various parameters for simple queues. I used Marchal’s average waiting time approximation for single queue with generally distributed inter-arrival and service intervals (G/G/1). This approximation allows me to incorporate the inter-round interval and variance from my simulation into the queuing theory model computation.

I will spare the explanation on arriving with the formula for the average round queue wait time (it is pretty straightforward adaptation from here, with service and arrival rates expressed as rounds per second) and just give you the result for a single queue and single worker:

  • \(p = R(N\mu_{md} + 2\mu_{ms})\), where \(p\) is queue utilization or probability queue is not busy.
  • \(C_s^2 = \frac{N^2\sigma_{md}^2 + 2^2\sigma_{ms}^2}{(N\mu_{md} + 2\mu_{ms})^2}\)
  • \(C_a^2 = \frac{sigma_r^2}{\mu_r^2}\)
  • \(w=\frac{p^2(1+C_s^2)(C_a^2+C_s^2p^2)}{2R(1-p)(1+C_s^2p^2)}\)

With the ability to compute average queue waiting time and average time for message \(l_{q-1}\) turn around, I can compute the average round latency time for a given throughput quickly without having to simulate multiple rounds to get the average for these parameters. \(L = 2\mu_{ms} + 2\mu_{md} + r_{lq-1} + w + \mu_l\), where \(r_{lq-1}\) is the mean RTT for quorum message \(l_{q-1}\) and \(w\) is the average queue wait time for given throughput parameters and \(\mu_l\) is the network RTT for a message exchange with the client.

As such, the average round latency becomes:

\(L = 2\mu_{ms} + 2\mu_{md} + r_{lq-1} + \frac{p^2(1+C_s^2)(C_a^2+C_s^2p^2)}{2R(1-p)(1+C_s^2p^2)} + \mu_l\)

Figure 6 shows the model’s results for latency at various throughputs. The queuing theory model exhibits very similar patterns as the simulation, albeit the simulation seems to degrade quicker at higher throughputs then the model, especially for 3-node cluster. This may due to the fact that the simulation captures the message distribution within each round, while the model looks at the round as one whole.

Model for latency as a function of throughput for different cluster sizes.
Figure 6. Model for latency as a function of throughput for different cluster sizes.

Flexible Quorums

I can use both the simulation and the model to show the difference between paxos and flexible paxos (FPaxos) by adjusting the quorums. For instance, I modeled a 9-node deployment of flexible paxos with phase-2 quorum \(q2\) of 3 nodes. In my setup, flexible paxos must still communicate with all 9 nodes, but it needs to wait for only 2 replies, thus it can finish the phase quicker then the majority quorum. However, as seen in Figure 7, the advantage of smaller quorum is tiny compared to normal majority quorum of 9-node paxos. Despite FPaxos requiring the same number of messages as 5-node paxos setup, the costs of communicating with all 9 nodes do not allow it to get closer in performance event to a 7-machine paxos cluster.

Modeling Paxos and FPaxos 9 nodes, with |q2|=3
Figure 7. Modeling Paxos and FPaxos 9 nodes, with |q2|=3

Conclusion and Next steps

So far I have modeled single-leader paxos variants in the local area network. I showed that network variations have a negligible impact on majority quorum paxos. I also illustrated that it is hard to rip the performance benefits from flexible quorums, since queuing costs of communicating with large cluster become overwhelming. However, not everything is lost for FPaxos, as it  can reduce the number of nodes involved in phase-2 communication from full cluster size to as little as \(|q2|\) nodes and greatly mitigate the effects of queue waiting time for large clusters.

The simulation and model are available on GitHub, so you can check it out and tinker with parameters to see how the performance may change in response.

There are still quite a few other aspects of paxos that I find interesting and want to model in the future. In particular, I want to look at WAN deployments, multi-leader paxos variants and, of course, our WPaxos protocol that combines multi-leader, WAN and flexible quorums.