Tag Archives: replicated state machine

Reading Group. EPaxos Revisited.

In the 88th DistSys meeting, we discussed the “EPaxos Revisited” NSDI’21 paper by Sarah Tollman, Seo Jin Park, and John Ousterhout. We had an improvised presentation, as I had to step in and talk about the paper on short notice without much preparation. 

EPaxos Background

Today’s paper re-evaluates one very popular in academia state machine replication (SMR) protocol — EPaxos. EPaxos solves a generalized consensus problem and avoids ordering all state machine operations. Instead of a single total order of everything, EPaxos relies on a partial order of dependent operations. For instance, if two commands are updating the same object, then these commands may depend on each other and must be ordered with respect to each other. These dependant operations can happen concurrently, in which case they are conflicting. Not all operations depend on each other or have conflicts. For instance, if operations are commutative, and let’s say, work on different objects, requiring an order between them is overkill. Avoiding ordering operations that do not care about the order can enable some parallelism. 

In practice, things are a bit more complicated. The first problem is figuring out if operations depend on each other and conflict. Of course, one can funnel all commands/operations through a single node, like Raft or Multi-Paxos, and have that node figure out which operations conflict and which do not. However, our goal was to improve parallelism, and with a single node to handle all commands, we are not achieving it, at least not on the scale possible in distributed systems. So, we need different nodes to start operations independently and learn if these operations may conflict with each other. EPaxos does this with its leaderless approach.

Every node in EPaxos can receive a request from a client and replicate it in the cluster, learning of dependencies and possible conflicts in the process. We can call the node driving replication of a command a coordinator. If a quorum of nodes agrees on dependencies for the command, the protocol commits in one round trip. Such single RTT replication is called a fast path, and in EPaxos it requires a fast quorum that can be bigger than the traditional majority quorum in some deployments. However, if nodes disagree about dependencies, there is a conflict, and the protocol may take a slow path. Slow path essentially uses Paxos to replicate and commit the operation with a “merged” list of conflicting dependencies to other nodes. It ensures that conflicts that may have been observed by only a minority of nodes are recorded by a quorum for fault tolerance. 

Let’s look at this a bit deeper. The two commands ‘C1’ and ‘C2’ conflict when some nodes have received them in a different order. As such, one node may say ‘C1’ comes before ‘C2,’ and the other node orders ‘C2’ before ‘C1.’ As such, we express conflicts by a mismatch of dependencies at different nodes. In this example, the mismatch is ‘C1’ depends on ‘C2’ vs. ‘C2’ depends on ‘C1.’ EPaxos needs to ensure that enough nodes know about all the dependencies happening, so a slow path is taken to commit the command with all the learned dependencies. It is important to understand that not all dependencies are conflicts. For example, if operation ‘C4’ happened so much after operation ‘C3,’ that all (or really a quorum of nodes) nodes see that ‘C3’ happened before ‘C4’ in their dependencies, then this is not a conflict, and we can proceed with the fast path. 

Now let’s take a look at a few examples. The figure above, which I borrowed from the revised paper, has three distinct operation examples. In example-1, node ‘A’ is the coordinator for ‘x=5’ operation, and it knows of no dependencies for the object. It reaches out to a quorum, and the followers also know of no dependencies. Coordinator ‘A’ concludes the operation in the fast path. The same would have happened if the dependencies known to ‘A’ matched the followers — since this is a no-conflict situation, ‘A’ can commit in the fast path. In example-2, node ‘E’ knows of no dependencies for ‘x=9’ command, but one node in the quorum has recorded a dependency in the form of other operation on object ‘x’. This dependency mismatch between replicas is a conflict, and ‘E’ takes the slow path to record the combined observation. Note that initially, only replicas ‘A-C’ had the dependency, but after the Accept message by ‘E’, both ‘D’ and ‘E’ also have it. Performing the slow path ensures that at least a quorum of nodes knows the dependency. Finally, example-3 has node ‘A’ as the coordinator again. It knows a previous command as a dependency, but the followers know more additional dependencies. Even though there seems to be a conflict, all the followers agree on dependencies. In this case, the protocol can proceed in the fast path in some cases. The exact conditions that determine whether node ‘A’ can take that fast path depend on how ‘A’ communicates with the rest of the replicas. I will discuss this later. 

There are a few caveats to the process. First of all, when the coordinator learns of dependencies, it must learn the whole chain of dependencies to make sure the new command is properly ordered and executed at the proper time. So, over time the dependency lists get larger and larger. At some point, it will make no sense to remember old commands that were executed a long time ago on all nodes as conflicts, so the conflict and dependency tracking system must have a way to prune the no longer needed stuff. Another issue is the need to wait for all dependencies to execute before executing the new operation. As a result, the acts of committing an operation and applying it to the state machine can be quite far apart in time. 

One detail missing in my simplified explanation so far is the need to construct dependency graphs for the execution of operations. See, the fast path and slow path modes are all about committing the operations and their dependencies. We have not really ordered the operations yet, and the dependency graph will help us establish the actual execution order. Each operation is a vertex in a graph, and dependencies between operations are the edges. So to execute an operation, we first add it to the graph. Then we add the dependent operations and connect the operations and their dependents. Then for each new vertex, we also look at its dependents and add them. The process continues until there is nothing more to add to the graph. In the resultant dependency graph, concurrent operations may appear to depend on each other, creating loops or cycles. The execution order is then determined by finding all strongly connected components (SCCs), which represent individual operations, conflict loops, or even groups of loops, sorting these SCCs in inverse topological order, and executing SCCs in that order. When executing each strongly connected component, we deterministically establish an operation order for commands in the SCC based on a sequence number and execute all non-executed operations in the increasing sequence number. A sequence number is maintained to break ties and order operations in the same SCC. The paper has more details about the sequence number. As you can imagine, dealing with the graph is quite expensive, so it is even more important to prune all old dependencies out. 

Finally, one of the biggest caveats to this fast path leaderless process is typical in distributed systems. Of course, I am talking about handling failures. In particular, the protocol needs to ensure that failures do not erase the fast-path decision. It is tricky due to concurrency, as two or more coordinators may run conflicting operations. Having such concurrency is like attempting to place two different commands in the same execution slot or position. If we use a majority decision to see which command was successful in that particular position, then we risk creating an ambiguous situation even when just one node fails. For instance, if a command ‘C1’ has three votes, and ‘C2’ has two in a cluster of five nodes, then a failure of one ‘C1’ vote creates ambiguity, as both ‘C1’ and ‘C2’ have the same number of remaining nodes. Fast Paxos paper describes the problem well, and the solution is to use a bigger fast quorum to avoid such ambiguity. 

A larger fast path quorum is the case in EPaxos protocol. However, the original paper proposes some optimizations to allow a majority fast quorum in configurations of three and five nodes (technically, it is an \(f+\lfloor\frac{f+1}{2}\rfloor\) fast quorum). Of course, these optimizations do not come entirely for free, as the need to solve the ambiguity problem remains. The optimized recovery needs only \(\lfloor\frac{f+1}{2}\rfloor\) responses and some additional information about the dependencies available at the nodes. The whole set of optimizations is hard to summarize, so I refer interested readers to the paper, Section 4.4

Now let me return to the example-3 fast path again. Previously I said that the quorum needs to agree on dependencies, but also showed example-3. In that example, the coordinator disagrees with followers. However, the coordinator still takes a fast path as long as all the followers in the quorum agree. Unfortunately, this is not the case in optimized EPaxos, at least not always. In fact, the paper says that the followers must have the same dependencies as the coordinator for the fast path if the coordinator communicates with all nodes and not just an exact quorum of nodes. Moreover, the followers must record having matching dependencies with the coordinator. So, example-3 works when the coordinator only contacts enough nodes to form a quorum. Intuitively, succeeding in this configuration means that not only enough nodes matched the dependencies, but all other nodes do not even learn about the operation from the coordinator. Upon recovery, this restricted communication gives additional information, as the recovery protocol can know what nodes were part of the initial fast quorum. In practice, however, this creates problems, as it is harder to get a fast quorum to agree, as the coordinator needs to be lucky to pick exact nodes that agree on the dependencies. Moreover, just one node failure results in failed fast quorum and forces retry, further contributing to tail latency. 

As such, practical implementations broadcast messages to all nodes and expect the fastest to reply to form the quorum. In the broadcast-to-all communication, example-3 in the figure will not work. 

EPaxos Revisited

The revisited paper brings a few important contributions to EPaxos. First, it conducts a re-evaluation of the protocol and makes an excellent comparison to the Multi-Paxos baseline. I really like the re-evaluation part of this paper! 

Unlike the original EPaxos paper, the revisited paper reports performance for a much bigger range of possible conflicts. The paper uses a more realistic read/write workload with different read-to-write ratios. The authors also change the conflict workload to a more realistic scenario. In this original paper, conflicts were simulated by having concurrent commands access the same predefined key — a “hot spot” method. The revisited paper no longer focuses on just one key and, instead, simulates conflicts by using Zipfian distribution to select keys. Finally, the reporting of results is way more visual. The figures capture the performance differences between EPaxos and MultiPaxos in every region and tested scenario. I was surprised to see very bad EPaxos tail-latency, even given the expectations of it having more performance variability. Below is the evaluation figure for latency in a five regions deployment. It may appear confusing at first, but just reading the caption and staring at it for a minute was all I needed to understand it.

Another important part of re-evaluation is addressing the commit vs. execute latency problem. As mentioned in the summary above, execution can delay significantly after the commitment. Yet, the original EPaxos measured only commit latency. The figure below shows the difference.

The second contribution is a bit shaky. EPaxos performance degrades as the conflict rate rises. Naturally, finding a way to reduce conflicts can help avoid the expensive slow-path, which, in turn, will improve both latency and throughput. The paper proposes a technique they call Timestamped-Ordered Queuing (TOQ) to reduce conflicts. Recall that the EPaxos coordinator sends its dependencies to the followers, and the followers reply with their updated dependencies. Whether the algorithm takes a fast path depends on a few conditions. The basic rule is that all followers in the quorum must have the same dependencies. TOQ exploits this and adds some delays at each follower to increase the chance of followers having the same conflicts/dependencies. For concurrent conflicting operations, quorum replicas may receive these operations in a different order. These out-of-order messages require a slow path to resolve. But if followers wait a bit to receive these concurrent messages and order them deterministically, then we can have the same dependencies/conflicts at follower replicas in the quorum. 

This waiting does not fix the conflicts coordinator communicated with the followers. And this is where my problem with the revisited paper lies. The TOQ optimization relies on the coordinator’s ability to communicate a different list of dependencies than the followers and proceed with the fast path, as in example-3. However, this only happens when we use unoptimized EPaxos with a larger fast quorum or make the coordinator use “thrifty” mode by contacting only a quorum of nodes instead of all nodes. In both cases, we lose something — larger quorums increase latency and decrease the number of failures a system can sustain before it is required to take a slow path all the time. And thrifty optimization can increase tail latency substantially, as a single failed node may prevent the coordinator from having the quorum of votes, requiring a timeout and retry. Considering that delaying the followers’ reply already adds extra latency, the TOQ optimization becomes a bit questionable in practice. 

The paper studies the added effect of delaying the followers on latency and proposes some compromise solutions to reduce the delay at the expense of having some conflicts. On the evaluation side, the paper says it has not enabled thrifty optimization, so for TOQ to be correct, it must have used a larger fast quorum. However, the paper does not mention this, so it is impossible to know whether the authors performed the TOQ evaluation on the correct system configuration.

Discussion.

1) Correctness. In my presentation, I focused a lot on the correctness of example-3, where the coordinator communicates a different dependency list than the followers have in an optimized EPaxos fast quorum. I said that this is incorrect, according to the original paper. The lack of preparation for the meeting showed — I was wrong

The correctness, in this case, is not necessarily violated, but this largely depends on the “version” and configuration of EPaxos. According to the paper, a thrifty mode, where the coordinator only contacts the required number of nodes to reach quorum, will work as described in the figure, so there are no safety issues. However, without the thrifty mode, the safety breaks. It breaks because we are running a smaller fast quorum (optimized EPaxos) but do not record enough information to solve the ambiguity over taken fast-path that may arise with failures. The non-thrifty, optimized EPaxos is the configuration when we need followers to agree with the dependencies of the coordinator and record this. In the discussion, we looked at some examples of safety violations due to this problem.

Interestingly enough, I am not sure that the authors of the revised paper fully understood the implications of thrifty/non-thrifty and other optimizations on safety. In the EPaxos overview figure, the paper explicitly states: “for simplicity, we only show messages to the necessary quorum,” suggesting that the authors do not assume thrifty mode here, making example-3 incorrect. 

So, where did this example come from? The authors borrow a figure from the original EPaxos presentation. In that presentation, Iulian Morau explicitly stated that the coordinator talks to the majority of replicas only (3 out of 5).

2) TOQ Benefits. In the light of the above, TOQ benefits become a bit more questionable because we are not sure whether the experiments ran on the correct and safe configuration of EPaxos. At the same time, it is worth mentioning that the Tempo paper, which we will discuss later this year, uses a similar approach to TOQ. We also discussed the Accord protocol with a similar technique, so the TOQ is not entirely without benefits and seems appreciated in other literature. 

3) Bounding Dependency Chains. This is an important improvement to EPaxos introduced in this paper. It prevents dependency chains from growing indefinitely under certain conditions. Such uncontrolled growth can prevent execution, stalling the protocol. We have spent quite a bit of time trying to figure out exactly how the proposed solution works. We think the paper could have improved here a bit more on the clarity.

4) Appendix. The appendix has lots of additional performance data that is worth checking. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Viewstamped Replication Revisited

Our 74th paper was a foundational one — we looked at Viestamped Replication protocol through the lens of the “Viewstamped Replication Revisited” paper. Joran Dirk Greef presented the protocol along with bits of his engineering experience using the protocol in practice.

Viestamped Replication (VR) solves the problem of state machine replication in a crash fault tolerance setting with up to f nodes failing in the cluster of 2f+1 machines. In fact, the protocol (between its two versions, the original and revisited) is super similar to Raft and Multi-Paxos. Of course, the original VR paper was published before both of these other solutions. An important aspect in the narrative of the VR paper is that the basic protocol makes no assumptions about persisting any data to disk. 

Replication

In the common case when replicating data, VR has a primary node that prescribes the operations and their order to the followers. The primary needs a majority quorum of followers to ack each operation to consider it committed. Once it reaches the quorum, the primary can tell the followers to commit as well. In VR, followers do not accept out-of-order commands and will not ack an operation without receiving its predecessor. With a few differences, this is how typical consensus-based replication works in both Raft in Multi-Paxos. For example, Multi-Paxos complicates things a bit by allowing out-of-order commits (but not execution to the state machine).

View Change and Replicas Recovery

As with everything in distributed systems, the interesting parts begin when things start to crumble and fail. Like other protocols, VR can mask follower crashes, so the real challenge is handling primary failures. The VR protocol orchestrates the primary failover with a view-change procedure. Here I will focus on the “revisited” version of the view-change. 

One interesting piece of the revisited view-change is its deterministic nature, as each view is assigned/computed to belong to a particular node. When a current primary in view fails, the node for view v+1 will try to become a new primary. This promotion to primary can be started by any node suspecting the current primary’s demise. We do not need to have a node for view v+1 to directly observe the crash of primary for v, allowing for unreliable failure detectors. 

So, any replica can start a view change if it detects a faulty primary. The replica will advance to the next view v+1 and start the view change by sending the StartViewChange message with the new view number to all the nodes. Each replica that receives the StartViewChange will compare the view to its own, and if its view is less than the one in the received StartViewChange message, it will also start the view change and send its own StartViewChange message to every replica. Also note, that two or more replicas may independently start sending the initial StartViewChange messages for the new view v+1 if they independently detect primary failure. The bottom line of this StartViewChange communication pattern is that now any replica that received at least f StartViewChange messages will know that the majority of the cluster is on board to advance with the view change. 

So, after receiving f StartViewChange messages for the new view v+1 from other nodes, each replica sends a DoViewChange message to the primary for that new view v+1. The DoViewChange message contains the replica’s last known view v’ along with the log. The new primary must receive a majority (i.e., f+1) of DoViewChange messages. The new primary then selects the most up-to-date log (i.e., the one with the highest v’ or longest log if multiple DoViewChange messages have the same highest v’). This up-to-date log ensures the new primary can recover any operations that might have been majority-accepted or committed in previous views. Such “learning from the followers and recover” part of the view change procedure is somewhat similar to Multi-Paxos, as the new leader needs to recover any missing commands before proceeding with the new ones. The VR leader recovery only cares about the view and the length of the log because it does not allow out-of-order prepares/commits. In Multi-Paxos, with its out-of-order commitment approach, we kind of need to recover individual log entries instead of the log itself. 

Anyway, after receiving enough (f+1DoViewChange messages and learning the most up-to-date log, the new primary can start the new view by sending the StartView message containing the new view v+1 and the recovered log. The followers will treat any uncommitted entries in the recovered log as “prepares” and ack the primary. After all entries have been recovered, the protocol can transition into the regular mode and start replicating new operations.

Schematic depiction of a View Change procedure initiated by one node observing primary failure. Other machines Send StartViewChange as a response. Eventually, a majority of nodes receive fStartViewChange messages and proceed to DoViewChange, at which point the new primary recovers the log.

There is a bit more stuff going on in the view change procedure. For example, VR ensures the idempotence of client requests. If some client request times out, the client can resend the operation to the cluster, and the new leader will either perform the operation if it has not been done before or return the outcome if the operation was successful at the old primary. 

Another important part of the paper’s recovery and view change discussion involves practical optimizations. For example, recovering a crashed in-memory replica requires learning an entire log, which can be slow. The proposed solution involves asynchronously persisting log and checkpoint to disk to optimize the recovery process. Similarly, view change requires log exchange, which is not practical. An easy solution would involve sending a small log suffix instead since it is likely that the new primary is mostly up-to-date. 

Discussion

1) Comparison With Other Protocols. I think the comparison topic is where we spent most of the discussion time. It is easy to draw parallels between Multi-Paxos and VR. Similarly, Raft and VR share many things in common, including the similarity of the original VR’s leader election to that of Raft. 

One aspect of the discussion is whether Multi-Paxos, Raft, and VR are really the same algorithms with slightly different “default” implementations. We can make an argument that all these protocols are the same thing. They operate in two phases (the leader election and replication), require quorum intersection between the phases, and have the leader commit operation first before the followers. The differences are how they elect a leader to ensure that the committed or majority-accepted operations from the prior leader do not get lost. Multi-Paxos and VR revisited recover the leader, while Raft ensures the new leader is up to date to eliminate the need for recovery. 

2) Optimizations. Academia and Industry have churned out quite a substantial number of optimization for Multi-Paxos and Raft. There has even been a study on whether the optimizations are interchangeable across protocols (the short answer is yes). But that study did not include VR. Interestingly, Joran mentioned that they applied Protocol-Aware-Recovery to VR, and (if I am not mistaken) they also used flexible quorums. Of course, I am not surprised here, given how similar these solutions are. 

3) In-memory Protocol. Joran stressed the importance of in-memory protocol description to engineers. In their systems, the in-memory consensus allows reducing the reliance on fault-tolerance/reliability of the storage subsystem. It appears that VR is the only major SMR protocol specified without the need for a durable infallible disk. And while people in academia do a lot of in-memory consensuses (including Multi-Paxos and Raft), the aspects of what is needed to make these systems work without the disk are not clearly described. 

The VR paper briefly describes the problem that the lack of durable log introduces. The issue is that a node can forget its entire state. So, it can accept some value, create a majority and then reboot. Upon rebooting, if the node is allowed to participate right away, we no longer have the forgotten value in the majority quorum. Similarly, a node can vote on something in one view, reboot and forget and then potentially revote in a lesser view by some slow old primary. 

To avoid these problems, the node needs to recover before it can participate. At least a partial recovery is needed to avoid the double voting problem — the node needs to learn the current view before accepting new operations. At the same time, such a partially recovered node must still appear “crashed” for any older operations until it has fully recovered. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Fault-Tolerant Replication with Pull-Based Consensus in MongoDB

In the last reading group meeting, we discussed MongoDB‘s replication protocol, as described in the “Fault-Tolerant Replication with Pull-Based Consensus in MongoDB” NSDI’21 paper. Our reading group has a few regular members from MongoDB, and this time around, Siyuan Zhou, one of the paper authors, attended the discussion, so we had a perfect opportunity to ask questions and get answers from people directly involved.

Historically, MongoDB had a primary backup replication scheme, which worked fine most of the time, but had a few unresolved corner-cases, such as having more than one active primary at a time due to the failures and primary churn. Having a split-brain in systems is never a good idea, so engineers looked for a better solution. Luckily, there are many solutions for this problem — an entire family of consensus-based replication protocols, like Multi-Paxos and Raft. MongoDB adopted the latter protocol to drive MongoDB’s update replication scheme. 

At this time, I might have finished the summary, congratulated MongoDB on using Raft, and moved on. After all, Raft is a very well-known protocol, and using it poses very few surprises. However, Mongo DB had a couple of twists in their requirements that prevent using vanilla Raft. See, the original MongoDB primary backup scheme had a pull-based replication approach. Usually, the primary node (i.e., the leader) is active and directly prescribes operations/commands to the followers. This active-leader replication happens in traditional primary backup, in Multi-Paxos, Raft, and pretty much any other replication protocol aside from pub-sub systems that rely on subscribers pulling the data. In MongoDB, the primary is passive, and followers must request the data at some repeated intervals. In the figure below, I illustrate the difference between the push-based (active leader) and pull-based (active follower) approaches.  

In both cases, the followers can learn new data from the leader. The pull-based approach, however, requires a bit more work. The leader has to compute the data batch for each follower individually, because followers may all have slightly different progress due to the differences in polling schedule. Also, there is another problem with the pull solution: the primary has no way of knowing whether the followers have received the operations during the pull. And this is important since the primary needs to count replicas accepting each operation to figure out the quorum and ultimately commit the operations. In traditional Raft, the result of AppendEntry RPC carries an ack for a successful operation, allowing the primary to count successful nodes and figure out when the quorum is satisfied. With flipped pull-style communication, there is no reply to the primary, and so we need to add some additional mechanism to propagate this information, as shown in the figure here. Adding more communication is costly, but the extra message can be piggybacked into the consecutive pull request at the expense of latency.

Pull-based Raft. Note that the leader must know when a quorum has accepted an operation.

Now let’s diverge a bit and discuss why MongoDB picked a pull-based approach despite added complexity/cost. The traditional Pull-based solutions are centered around the primary or leader node, creating a star topology. In fact, unless you use something like Pig primitive from our PigPaxos paper, star topology is pretty much all you can have. This is not very good in some scenarios where links may be congested or cost a lot of money. Consider a WAN scenario where two nodes may exist in another region for geo-redundancy to minimize data loss in the event of regional failure. With star topology, the same payload will travel across WAN twice to each of the two replicas, consequently costing twice as much to replicate. With a pull-based approach, it is easier to orchestrate other replication topologies. All we have to do is to instruct one node to pull data from its regional peer instead of the leader across WAN. Naturally, this creates a longer replication chain but saves the WAN link bandwidth, cost, and potentially some resources at the primary.

MongoDB’s pull-based solution enables such chaining by allowing some non-primary nodes to act as sync sources. Sync source is a node that provides data to syncing servers in response to a pull request. Of course, the primary is always a sync source. The high-level overview of node interactions is a follow:

  • Syncing server asks for new log entries from its sync source.
  • Sync source replies with a sequence of log entries.
  • Syncing server appends the log items if items follow the same history. 
  • If source and server logs diverge, the syncing server may have uncommitted data and must clean the divergent suffix. The detection of log divergence is based on matching the last log entry of syncing server with the first entry of the log segment sent by the source. If they match, then the source has sent a continuation of the same history.
  • Syncing server notifies its sync source of how up-to-date it is, including syncing server’s term.

Unlike vanilla raft, the syncing server does not check the term of the source, so it may be possible for a server to accept log entries from a source in some previous term. Of course, this can only happen if the log entries follow the same history. I think MongoDB Raft does so to make sure that the syncing server learns about legit updates from the previous leader even if it has already participated in the new leader’s election round. What is important here, is that the syncing server sends its higher term back to the source (which should propagate to the source’s source, etc, until it reaches the leader for the source’s term). These term messages act as a rejection for the old leader, so it won’t count the syncing server as accepting the message and being a part of the quorum. As a result, if the data from the old-termed sync source was committed, the syncing server has received it and will eventually receive the commit notification from the new leader. If that data is uncommitted by the old leader (i.e., a split-brain situation), then no harm is done since the syncing server does not contribute to the quorum. The syncing server will eventually learn of proper operations instead.

Now speaking of performance, the paper does not provide any comparison between push- and pull-based solutions, so we are left wondering about the impact of such drastic change. However, some experiments illustrate the comparison of star topology and chained replication in a 2-regions WAN setup. While chaining does not seem to increase the performance (unless the cross-region bandwidth is severely restricted), it predictably lowers the WAN bandwidth requirements. As far as maximum performance, the paper mentions that the limiting factor is handling client requests and not replication, and this is why one fewer server pulling from the leader does not impact throughput. I am not sure I am very comfortable with this explanation, to be honest. 

The paper talks about a handful of other optimizations. I will mention just one that seemed the most interesting — speculative execution. With speculative execution, the nodes do not wait for a commitment notification to apply an operation, and speculatively apply it to the store right away. Since the underlying storage engine is multi-version, the system can still return strongly consistent reads by keeping track of the latest committed version. The multi-version store also allows rollbacks in case some operation fails to commit.

You can see my presentation of the paper on YouTube:

Discussion

1) Cost of pull-based replication. The performance cost of the pull-based solutions was the first big discussion topic that I raised in my presentation. As I mentioned, there is an extra communication step needed to allow the primary to learn of quorum acceptance. This step either adds additional message round/RPC call or adds latency if piggybacked to the next pull request. Another concern is pulling data when there is no new data, as this will be wasted communication cycles. 

Luckily, we had MongoDB people, including Siyuan Zhou, one of the paper authors, to shed some light on this. To make things a little better and not waste the communication cycles, the pulls have a rather long “shelf life” — if the source has no data to ship, it will hold on to the pull request for up to 5 seconds before replying with a blank batch. 

Another big optimization in the MongoDB system is a gradual shift to the push-based style! This somewhat makes the entire premise of the paper obsolete, however, this new “push-style” replication is still initiated by the syncing server with a pull, but after the initial pull, the source can push the data to the syncing server as it becomes available. So this allows building these complicated replication topologies while reducing the latency impact of a purely pull-based solution.

Another aspect of cost is the monetary cost, and this is where chained replication topologies can help a lot. Apparently, this was a big ask from clients initially and shaped a lot of the system architecture. 

2) Evolution vs Revolution. So, since the original unproven replication approach was pull-based to allow chained topologies, the new improved and model-checked solution had to evolve from the original replication. One might think that it would have been easier to slap a regular push-based Raft, but that would have been a drastic change for all other components (not to mention the features). This would have required a lot more engineering effort than trying to reuse as much of the existing code as possible. This brings an interesting point on how production software gradually evolves and almost never drastically revolves.

3) Evaluation. The evaluation is the biggest problem of the paper. It lacks any comparison with other replication approaches except old MongoDB’s primary backup scheme. This makes it hard to judge the impacts of the changes. Of course, as we have seen from the discussion with the authors, the actual implementation is even more complicated and evolved. It tries to bridge the gap between pull and push replication styles, so a clear comparison based on MongoDB’s implementation may not have been possible at all. 

That being said, I had some questions even about the provided self-comparisons. See, I would have expected to see a bit of throughput increase from chaining, similar to what I observe in PigPaxos, but there is none. The paper explains it by saying that replication at the sync source takes only 5% of a single core per syncing server, which would amount to just 20% of a core in star topology leader. Roughly, given the VMs used, this is around 5% of the total CPU used on replication, with the paper claiming that all remaining CPU is used to handle client requests. Assuming there is sync every 2 ms, we have about 2000 syncs per second at the leader for 12000 client requests. Doing some napkin math, we can see that there are 6 times more requests than replications per second, yet requests use 19 times the CPU, making each request roughly 3 times more expensive than each replication. Given that replication messages are not super cheap and require querying the log, and serializing the data, the performance cost difference sounds excessive to me, despite considering that client requests have a few writes in them (i.e., write to the log, and operation execution).

Siyuan explained that there is a bit more work on the request path as well. Not only the log is written (and persisted for durability), but there are also some additional writes for session maintenance and indexes. Moreover, the writes in the underlying WiredTiger engine are transactional, costing some performance as well. 

4) PigPaxos? Throwing this out here, but there are push-based solutions that can have more interesting replication topologies than a star. Our PigPaxos, in fact, solves a similar issue of cross-regional replication at a low network cost. 

5) Other pull-based systems. Finally, we try to look at other solutions that may use pull-based replication, but there are not many. Pub-Sub systems fit the pull model naturally as subscribers consume the data by pulling it from the queue/log/topic. Pull-based replication can be handy in disaster recovery when nodes try to catch up to the current state and must ask for updates/changes starting some cursor or point in history. 

6) Reliability/Safety. As the protocol makes one important change of not rejecting the data from the old-termed source, it is important to look at the safety of this change. The paper claims to model the protocol with TLA+ and model checking it. Intuitively, however, we know that even though the node takes the data from the old source, it actively rejects it by sending its higher term. This, intuitively, should be enough to ensure that the old leader does not reach a quorum of accepts (even though there can be a majority of nodes that copied the data) and does not reply to a client of commitment. The rest is taken care of by Raft’s commit procedure upon the term change.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Scalable but Wasteful or Why Fast Replication Protocols are Actually Slow

In the last decade or so, quite a few new state machine replication protocols emerged in the literature and the internet. I am “guilty” of this myself, with the PigPaxos appearing in this year’s SIGMOD and the PQR paper at HotStorage’19. There are better-known examples as well — EPaxos inspired a lot of development in this area. However, there seems to be a big disconnect between literature and production. While many sophisticated protocols, such as EPaxos, Atlas, SDPaxos, Comprmentalized Paxos, appear in the literature, and some of them are rather popular in the academic community, the production systems tend to rely on more conservative approaches based on Raft, Multi-Paxos, or primary-backup.

There are probably a few reasons for this disconnect. For instance, as we discussed in Paxos vs Raft reading group meeting, explaining protocols in terminology closer to actual code and having good reliable reference implementations help with adoption. However, the difficulty of implementing some technology should not be a roadblock when this technology offers substantial improvements. 

Well, it appears that there are substantial quantitative improvements. Consider EPaxos. The protocol presents a leader-less solution, where any node can become an opportunistic coordinator for an operation. The exciting part is that EPaxos can detect conflicts, and when operations do not conflict with each other, both of them can succeed in just one round trip time to a fast quorum. The original paper claims better throughput than Multi-Paxos, and this is not surprising, as EPaxos avoids a single-leader bottleneck of Multi-Paxos and allows non-conflicting operations to proceed concurrently. This is a big deal! 

Throughput of Multi-Paxos and EPaxos in 5 dedicated nodes cluster under low conflict.

I wanted to find the truth here, so I and a couple of colleagues worked on experimentally testing Multi-Paxos and EPaxos. We performed a simple experiment, shown in the figure here, to confirm EPaxos’ performance advantage, and we achieved somewhere between 15-25% better throughput in 5 nodes EPaxos than Multi-Paxos when running on identical VMs (EC2 m5a.large with 2vCPUs and 8 GiB of RAM). Original EPaxos paper has a bit larger performance difference in some situations, but we have spent significantly more time on our Multi-Paxos implementation and optimizations than EPaxos. Anyway, we see a substantial improvement that was stable, reproducible, and quite large to make a real-world difference. Not to mention that EPaxos has a few other advantages, such as lower latency in geo-distributed deployments. So what gives? 

I have a theory here. I think many of these “faster” protocols are often slower in real life. This has to do with how the protocols were designed and evaluated in the first place. So bear with me and let me explain. 

Most evaluations of these consensus-based replication protocol papers are conducted in some sort of dedicated environment, be it bare-metal servers in the lab or VMs in the cloud. These environments have fixed allocated resources, and to improve the performance we ideally want to maximize the resource usage of these dedicated machines. Consider Multi-Paxos or Raft. These protocols are skewed towards the leader, causing the leader to do disproportionately more work than the followers. So if we deploy Multi-Paxos in five identical VMs, one will be used a lot more than the remaining four, essentially leaving unused resources on the table. EPaxos, by design, avoids the leader bottleneck and harvests all the resources at all nodes. So naturally, EPaxos outperforms Multi-Paxos by using the resources Multi-Paxos cannot get a hold of ue to its design. Such uniform resource usage across nodes is rather desirable, as long as avoiding the leader bottleneck and allowing each node to participate equally comes cheaply. 

Aggregate CPU utilization for Multi-Paxos and EPaxos in 5 nodes

Unfortunately, it is not the case, and EPaxos’ ability to use resources that would have been left idle by Multi-Paxos comes at a steep penalty! Intuitively, EPaxos needs to do a lot more work to maintain safety and remain leader-less — it needs to communicate the dependencies, compute dependency graphs, check for conflicts in these graphs, and resolve the conflicts as needed. This added complexity contrasts with Multi-Paxos that provides most of its safety via a few simple comparisons both at the leader and followers. To try to estimate the cost of EPaxos’ role uniformity, we need to look at resource usage. In the same experimental run as in the absolute performance figure earlier, we have measured CPU usage across all VMs. Expectedly, we were able to observe that EPaxos is very good at using all available resources, and Multi-Paxos consumes the entire CPU only at one node – the leader. However, it does not take long to spot something odd if we start looking at the aggregate resource usage, shown in the figure here. It turns out that Multi-Paxos achieved nearly 14k ops/s of throughput consuming roughly 200% CPU (and leaving 300% unused), while EPaxos did a bit over 18k ops/s with all 500% of aggregate CPU utilization. 

Throughput normalized by the CPU usage

This suggests that EPaxos needs overall more CPU cycles across all nodes to finish each operation. We can see this better if we normalize the throughput per the amount of CPU consumed. As seen in the figure, this normalized throughput presents a big gap in the efficiency of protocols. Of course, this gap may vary and will depend on the implementation. It may even shrink if some better engineers implement EPaxos, but I also think there is a fundamental issue here. EPaxos and many other protocols were designed to take advantage of all allocated resources in the cluster since allocated but unused resources are wasted. This resource usage paradigm is common when we have dedicated servers or VMs. However, technology has moved on, and we rarely deploy replicated services or systems all by themselves in isolation. With the advances in virtualization and containerization, we increasingly deploy replicated systems in resource-shared environments that are designed to pack applications across clusters of servers in a way to avoid having idle physical resources. 

Aggregate throughput of 5 Multi-Paxos and EPaxos instances in a task-packed 5 nodes cluster.

Moreover, most replicated systems are sharded, so we normally have multiple instances of the replication protocol supporting shards of one larger system. This makes task-packing simpler as we have many relatively uniform tasks to schedule between machines. Consider Yugabyte DB for example. Karthik Ranganathan in his talk described how the system schedules raft groups across a cluster such that some server has a few resource-heavy raft leaders and plenty more resource-light raft followers. This type of packing allows Yugabyte (and Cockroach DB, Spanner, Cosmos DB, and so on) to achieve uniform resource usage on VMs or dedicated servers while having non-uniform, but efficient replication protocols. Here I show how 5 instances of Multi-Paxos compare with 5 instances of EPaxos when deployed on larger servers (32 GiB RAM and 8 vCPUs). This drastic difference compared to the dedicated environment is due to the fact that we were able to pack multiple instances of Multi-Paxos to consume all the resources of the cluster, and clock-for-clock, EPaxos has no chance of winning due to its lower efficiency. 

Ok, so I think there are quite a few things to unpack here. 

  • Absolute performance as measured on dedicated VMs or bare-metal servers is not necessarily a good measure of performance in real-life, especially in resource-shared setting (aka the cloud)
  • We need to consider efficiency when evaluating protocols to have a better understanding of how well the protocol may behave in the resource-shared, task-packed settings
  • The efficiency of protocols is largely under-studied, as most evaluations from academic literature simply focus on absolute performance.
  • The efficiency (or the lack of it) may be the reason why protocols popular in academia stay in academia and do not get wide adoption in the industry. 

In conclusion, I want to stress that I am not picking on EPaxos. It is a great protocol that has inspired a lot of innovation in the area distributed consensus. Moreover, I am sure there are use cases for EPaxos (or more recent extensions, such as Atlas), especially in the WAN setting when trading of some efficiency for better latency may be acceptable. Many other protocols (mine included) may have similar efficiency problems, as they approach the performance issue similarly – find the bottlenecked node and move its work elsewhere. The fundamental issues at play here are that (1) moving the work elsewhere comes at an efficiency penalty, and (2) in many modern environments there may not be any resource available for such “moved” work due to resource budgets and tight task-packing.

I, Abutalib Aghayev, and Venkata Swaroop Matte discuss the efficiency issues in a bit more detail in our upcoming HotStorage’21 paper: “Scalable but Wasteful: Current State of Replication in the Cloud.”

Reading Group. Protocol-Aware Recovery for Consensus-Based Storage

Our last reading group meeting was about storage faults in state machine replications. We looked at the “Protocol-Aware Recovery for Consensus-Based Storage” paper from FAST’18. 

The paper explores an interesting omission in most of the state machine replication (SMR) protocols. These protocols, such as (multi)-Paxos and Raft, are specified with the assumption of having a crash-resistant disk to write the operation log and voting metadata. This disk data allows crashed nodes to restart safely. However, the real-life gets in a way a bit, as infallible storage is as real as unicorns. 

Storage may fail in peculiar ways, when some data may get corrupted, while most other data is correct and the server itself continues working. The problem here is handling such failures. The simplest way is to treat the server as crashed. However, the server must remain crashed, as restarting may get into even more severe state corruption, as the server replays the operations from a faulty log. The paper talks about a variety of other approaches taken to deal with these data issues. The authors state that all the mechanisms they have explored were faulty and led to liveness or safety issues. I personally do not buy such a blanket statement, but a few of the examples in the paper were really interesting. 

The paper then suggests a solution – Protocol-Aware Recovery (PAR). The main point here is to avoid ad-hoc solutions because they are either slow, unsafe, complicated, or all of the above. This makes sense since such a big omission (potential for data-corrupting disk failures) in protocols should be addressed at the protocol level. The paper draws heavily on the Raft state machine protocol and develops the recovery procedure for it.

The log recovery is leader-based and can be broken down into two sub-protocols: follower recovery and leader recovery. The followers are recovered by restoring the data from the leader who always knows of all the committed history. Leader recovery is a bit more tricky and occurs as part of a leader election. Of course, if a non-faulty node can be elected a leader, then recovering faulty nodes is easy with the follower recovery. However, the leader election requires a node to have the most up-to-date log to become a leader, limiting a selection of nodes for the job. That being said, the node can be elected with a corrupted log, but it needs to recover the corrupted entries from the followers. If the entry is not available on any of the followers, the state machine becomes stuck (as it should). The protocol only recovers committed log entries and follows Raft logic to discard non-committed log suffix if it has corrupted entries. 

In addition, to log recovery, the paper also talks about snapshot recovery. The idea behind snapshot recovery is to make sure all nodes take the same snapshots at the same index in the log, break them into “chunks” and recover chunks as needed from other nodes. 

Here is the presentation by Rohan Puri:

Discussion

1) The need for logs? The paper assumes that a state machine takes periodic snapshots to a disk/drive, and such snapshot in combination with a log can be used for node recovery later. This implies that the actual current state of the state machine can be lost due to a server restart. However, some state machines are directly backed by the disk, in essence, representing a rolling snapshot that gets updated every time an operation from the log applies. Recovery of such disk-backed state machine can be quicker and require only log entries happening after the crash/restart. Of course, this does not mean that the disk-backed state machine itself cannot be corrupted. In any case, the log entries are required for recovery and can be garbage collected once all nodes have persisted the state machine to disk (either as part of normal operation or a snapshot), making the time-frame for the log entries to remain useful to be relatively small. 

A more interesting problem may arise in trying to recover the corrupted state machine. If we rely on this “rolling-snapshot” disk-backed state machine, the mechanism the paper uses for snapshot recovery won’t work, since different copies of the state machine may be misaligned ever-so-slightly. Of course, one can always do the costly node restore procedure — restore to some prior snapshot and replay the log, but this is wasteful and requires keeping an extra snapshot and log from the snapshot onwards. In the spirit of the paper, we should rely on distributed copies instead and be able to restore the corruption without relying on storing redundant copies on the same server

2) Persistent memory vs RAM and recovery for in-memory SMR. If we build a state machine replication (SMR) to work purely off RAM, then we do not have the luxury of retaining any state after a restart. As such, in-memory state machines must have different mechanisms to ensure safety. For example, in traditional Multi-Paxos with a disk, a node always remembers the current term/ballot and past votes it has participated in. Without durable memory, a node restart erases the previous voting state, allowing a node to vote on something it has already voted on before, but with a lower term/ballot. This is not safe and may lead to a double-commit on the same log entry when a node promises to some new leader, and then after restart makes a second promise in the same log index to some older leader. 

Allowing for corruption in persistent memory is somewhat similar to not having persistent memory at all, at least when dealing with crashes/restarts. The very piece of data/metadata we need to ensure safety and avoid double voting as in the example above may be corrupted and cannot be used after a restart. However, the same precautions used for in-memory replicated state machines will work with corrupted storage as well and allow for safe recovery. For example, to prevent the double-voting example, a recovering node needs to run a “mock” leader election (or a leader election with a term guaranteed to not succeed). Such leader election will ensure the node gets a proper view of the current ballot/term in the cluster to make sure it no longer accepts votes from prior leaders. After such a mock election, the node can start accepting/voting for log entries while recovering any prior log and/or state machine from any of the replicas. Of course, the full recovery completes when enough data is shipped from other nodes (i.e. snapshots + missing log entries). 

There are a few differences between RAM and persistent storage when it comes to recovery. First of all, while it seems like both can lose data (one due to a reboot, another due to some random corruption), persistent storage still has a hint of data being missing. This is like not remembering what the node has voted for or who was the leader, but still having a 6th sense that something was voted upon. This extra piece of information may be useful in recovery, and indeed the protocol from the paper takes advantage of that to improve fault tolerance and safety. The recovery protocol preserves safety when the majority of nodes fail at the same log index, as the protocol knows something is missing entirely and will halt for safety. In the RAM setting, a mass reboot (i.e. majority of nodes) leads to a collective loss of memory without any hint that something may have been agreed upon, leading to a rewrite of the log. 

The second difference is that persistent memory may not lose all the data, so fewer items must be shipped from the followers. 

3) Leader-bound recovery. The paper suggests recovering followers from the leader node. This can put more load on the leader, who is already a bottleneck in the protocol. It seems like it may be possible to recover committed log entries from followers (the paper already does so for leader recovery) to make the recovery procedure less demanding for the leader.

4) Byzantine. The paper touches a bit on this topic. Data corruption on disk can be viewed through the lens of Byzantine fault tolerance. The corruption causes a node to act outside of the protocol specs, and byzantine-tolerant protocols are designed to handle such “out-of-spec” behaviors. The paper is a good example of how we can often solve some specific types of byzantine behaviors without resorting to the full-blown PBFT-style solutions. This is very practical, as we want the state machine to handle data corruptions, but we do not want to pay the performance penalty associated with BFT protocols. 

5) Luckilyhood of data corruption. Another point of discussion was around the likelihood of such data-faults happening. It does not seem like these are too frequent, but they do happen. We touched on a few anecdotal occurrences. For example, some firmware issues causing the disk to not write some large buffers of data. 

It is also worth noting error correction. Error correction is standard for server-grade memory, and it comes at a relatively small monetary/performance cost. Similar error-correction technologies are used in disks and drives, allowing for small errors (i.e. a bit-flip) to be fixed by the drive. In fact, NAND flash SSDs rely on error correction in normal operation.

6) Infallible disk. Protocols assume disk is always correct. Why? Even on the surface, this does not come as a super tight assumption. And especially on the scale of millions of SMR instances deployed across millions of machines.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!