# Reading Group. Rabia: Simplifying State-Machine Replication Through Randomization

We covered yet another state machine replication (SMR) paper in our reading group: “Rabia: Simplifying State-Machine Replication Through Randomization” by Haochen Pan, Jesse Tuglu, Neo Zhou, Tianshu Wang, Yicheng Shen, Xiong Zheng, Joseph Tassarotti, Lewis Tseng, Roberto Palmieri. This paper appeared at SOSP’21.

A traditional SMR approach, based on Raft or Multi-Paxos protocols, involves a stable leader to order operations and drive the replication to the remaining nodes in the cluster. Rabia is very different, as it uses a clever combination of determinism to independently order requests at all nodes and a binary consensus protocol to check whether replicas agree on the next request to commit in the system.

Rabia assumes a standard crash fault tolerance (CFT) model, with up to f node failures in a 2f+1 cluster. Each node maintains a log of requests, and the requests execute in the log’s order. The log may contain a NO-OP instead of a request.

When a client sends a request to some node, the node will first retransmit this request to other nodes in the cluster. Upon receiving the request, a node puts it in a min priority queue of pending requests. Rabia uses this priority queue (PQ) to independently and deterministically order pending requests at each node, such that the oldest request is at the head of the queue. The idea is that if all nodes have received the same set of requests, they will have identical PQs.

At some later point in time, the second phase of Rabia begins — the authors call it Weak-MVC (Weak Multi-Valued Consensus). Weak-MVC itself is broken down into two stages: Propose Stage and Randomized Consensus Stage. In the propose stage, nodes exchange the request at the head of PQ along with the log’s next sequence number seq. This stage allows the nodes to see the state of the cluster and prep for the binary consensus. If a node sees the majority of a cluster proposing the same request in the same sequence number, then the node sets its state to 1. Otherwise, the node assumes a state of 0.

At this point, the binary consensus begins to essentially certify one of the two options. The first option is that a majority of nodes want to put the same request in the same sequence number (i.e., the state of 1). The second option is to certify that there is no such common request in the majority(state of 0). For binary consensus, Rabia uses a modified Ben-Or algorithm. Ben-Or consists of two rounds that may repeat multiple times.

In round-1, nodes exchange their state and compute a vote to be either 0 or 1. The vote corresponds to the majority of state values received, so if a node received enough messages to indicate that the majority of nodes are in state 1, then the node will take on the vote value of 1. Similarly, if a majority has state 0, then the node will vote 0. If no majority is reached for any state, the vote is nil.

Round-2 of Ben-Or exchanges votes between nodes. If the majority of nodes agree on the same non-nil vote, the protocol can terminate. Termination means that system has agreed to certify the request from the proposal if the consensus value is 1 or to create NO-OP if a value is 0.

In an ideal situation, all participating nodes will have the same request at the head of their PQs when the propose phase starts. This means that nodes will have the same state at the end of the propose phase, allowing the binary consensus to certify the proposed request at its sequence number in just one round trip (Round-1 + Round-2 of Ben-Or). So the request distribution + proposal + Ben-Or consensus under such an ideal case only takes 4 message exchanges or 2 RTTs. It is way longer than Multi-Paxos’ ideal case of 1 RTT between the leader and the followers, but Rabia avoids having a single leader.

A less ideal situation arises when no majority quorum has the same request at the head of PQ when the proposal starts. Such a case, for example, may happen when the proposal starts before the request has had a chance to replicate from the receiving node to the rest of the cluster. In this case, binary consensus may reach the agreement on state 0 to not certify any operation in that particular sequence number, essentially producing a NO-OP. The authors argue that this NO-OP is a good thing, as it gives enough time for the inflight requests to reach all the nodes in the cluster, get ordered in their respected PQs. As a result, the system will propose the same request in the next sequence number after the NO-OP. Both of these situations constitute a fast path for Ben-Or, as it terminates in just one iteration (of course the latter situation does not commit a request, at least not until the retry with higher sequence number).

Now, it is worth pointing out that the fast path of one RTT for binary consensus is not always possible, especially in the light of failures. If too many nodes have a nil vote, the protocol will not have enough votes agreeing for either state (1 – certify the request, 0 – create a NO-OP), and the Ben-Or process must repeat. In fact, the Ben-Or protocol can repeat voting many times with some random coin flips in between the iterations to “jolt” the cluster into a decision. For more information on Ben-Or, Murat’s blog provides ample details. This jolt is the randomized consensus part. The authors, however, replaced the random coin flip at each node with a random, but deterministic coin flip so that each node has the same coin flip value for each iteration. Moreover, the coin flip is only needed at the node if there is no vote received from other nodes in round-1 of Ben-Or, otherwise, the node can assume the state of the vote it received. The whole process can repeat multiple times, so it may not be very fast to terminate.

The paper provides more details on the protocol. Additionally, the authors have proved the safety and liveness of the protocol using Coq and Ivy.

The big question is whether we need this more complicated protocol if solutions like Multi-Paxos or Raft work well. The paper argues that Raft and Paxos get more complicated when the leader fails, requiring the leader election, which does not happen in Rabia. Moreover, Paxos/Raft-family solutions also require too many additional things to be bolted on, such as log pruning/compaction, reconfigurations, etc. The argument for Rabia is that all these extra components are easier to implement in Rabia.

And speaking of evaluations, this was the biggest disappointment for me. The authors claim Rabia compares in performance to Multi-Paxos and EPaxos in 3 and 5 nodes clusters, with 3-nodes in the same availability zones allowing Rabia to outperform EPaxos. In fact, the figure below shows that Rabia beats Multi-Paxos all the time.

But there are a ton of assumptions and tweaking going on to get these results. For example, Rabia needs to have enough time to replicate client requests before starting the propose phase to have a good chance for completing on the fast path. So the testing is done with two types of batching to give the delay needed. The figure mentions the client batching. However, there is also a much more extensive server-side batching which is mentioned only in the text. Of course, there is nothing wrong with batching, and it is widely used in systems. For all the fairness, the paper provides a table with no batching results, where Multi-Paxos outperforms Rabia fivefold.

The biggest issue is the lack of testing under less-favorable conditions. No evaluation/testing under failures. No testing when the network is degraded and does not operate on the timing conditions expected by the protocol. These issues impact real performance and may create reliability issues. For example, a network degradation may cause Rabia to not use a fast path and consume more resources, reducing its maximum processing capacity. Such a scenario can act as a powerful trigger for a metastable failure

As usual, we had a nice presentation of the paper in the reading group. Karolis Petrauskas described the paper in great detail:

## Discussion.

1) Evaluation. I have already talked about evaluation concerns, and this was one of the discussion topics I brought up during the meeting.

2) Use of Ben-Or. Ben-Or is an elegant protocol to reach binary consensus, which is not usually useful for solving state machine replication. Traditionally, Multi-Paxos or Raft agree on a value/command and its sequence number, so they need a bit more than just a yes/no agreement. However, Rabia transforms the problem into a series of such yes/no agreements by removing replication and ordering from consensus and doing it apriori. With deterministic timestamp ordering of requests, Rabia just needs to wait for the operation to exist on all/most nodes and agree to commit it at the next sequence number. So the consensus is no longer reached on a value and order, but on whether to commit some command and some sequence number.

3) Practicality. The evaluation suggests that the approach can outperform Multi-Paxos and EPaxos, but whether it is practical remains to be seen. For one, it is important to see how the solution behaves under less ideal conditions. Second, it is also important to see how efficient it is in terms of resource consumption. EPaxos is not efficient despite being fast. The additional message exchanges over Multi-Paxos, Raft, and even EPaxos may cost Rabia on the efficiency side.

4) Algorithms. The paper provides some nice algorithms the illustrate how the protocol works. However, some of the conditions are not necessarily confusing. In the same algorithm, the authors use f+1, n-f, and floor(n/2)+1 to designate the majority in an n=2f+1 cluster. Please proofread your algorithms — a bit of consistency can improve readability a lot!

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Exploiting Nil-Externality for Fast Replicated Storage

85th DistSys reading group meeting discussed “Exploiting Nil-Externality for Fast Replicated Storage” SOSP’21 paper by Aishwarya Ganesan, Ramnatthan Alagappan, Andrea C. Arpaci-Dusseau, and Remzi H. Arpaci-Dusseau. The paper uses an old trick of delaying the execution of some operations to improve the throughput while maintaining strong consistency. Consistency is an externally-observable property, and simple strategies, like batching, can improve the throughput by making all concurrent operations wait a bit and replicate/execute in bulk. Some other solutions may delay some operations in favor of others when conflicts arise. For instance, the PQR approach may delay reads to allow writes to finish first. This paper takes the “delay-some-reads” mechanism one step further to enable clients to directly replicate certain writes/updates to the nodes in the cluster for 1 RTT client observable latency. At the same time, the leader orders the operations in the background and potentially delays reads for objects that have pending un-ordered, un-executed writes.

A core observation of the paper is that not all operations expose the system’s state upon acking their completion back to the clients. For example, a simple insert or replace command may only acknowledge its success without exposing any state. This behavior is normal, even expected, since we know what state a system should assume (at least for that particular object) after a successful write. The paper calls these operations nil-external. Since the nil-external command does not return any state, we do not really need to execute it right away, at least not on a critical path, as long as we can ensure that the command won’t be somehow forgotten after acking success to the client. The authors claim that nil-external operations are common in many production-grade systems, presenting an opportunity for improvement.

Skyros, the system presented in the paper, breaks down replication and execution of nil-external commands. The clients write their nil-external operations directly to the replicas, with each replica placing the command in the pending log, called d-log. Obviously, different nodes may have the same operation in different slots in the d-log due to races with other concurrent clients. From the client’s perspective, once a super-majority of replicas, including a leader replica, has acked the command to the client, it can consider the nil-external operation completed. This is because the operation is replicated and won’t get lost even under failures. However, the command has not been executed yet; moreover, its execution order has not been set either.

In the “happy case,” the command order mismatch in d-logs between replicas is irrelevant. Skyros is a leader-based protocol, and the leader periodically tells the followers to commit the commands from d-log to a commit log, or c-log, in the leader’s order. This part of the protocol looks like a Multi-Paxos or Raft, with the leader essentially replicating only the order of operations and not the operations themselves. Upon committing to c-log, the nodes remove the commands from d-log. This process is illustrated in the figure, which I compiled/borrowed from the author’s presentation of the paper.

Naturally, to be safe, Skyros needs to recover the real-time ordering. This is the reason for having a larger “supermajority” quorum. This trick has been pulled from Fast Paxos in several papers/protocols. Essentially, the supermajority quorum allows Skyros to recover the partial order of commands that have a happens-before relationship by simply looking at the majority of nodes in the majority quorum used for recovery. This majority of majority setup (i.e., 2 nodes out 3 nodes majority in a 5 nodes cluster) will have correct hb relations because of the quorum intersections. So, if “a” happened-before “b”, then this partial order will be captured in at least 2 nodes in any 3 nodes majority quorum in 5 nodes setup.

The evaluation is a bit tricky here. When testing with all nil-ext operations, Skyros beats MultiPaxos by a wide margin both in terms of latency and throughput (left-most figure). However, such workload consists of only writes/updates, so it is not very realistic. Adding some nonnil-ext operations to the mix changes things a bit (second from the left), with Skyros degrading down to Paxos performance when all operations are nonnil-ext. Also interesting to note is that this comparison does not really agree with the nilext-only experiment. See, there batched Paxos was operating at a maximum throughput of over 100k Ops/s, but in the nonnilext experiment, it is doing only 40k Ops/s, allowing authors to claim 2x throughput advantage of Skyros. I have a feeling that this experiment was done at some fixed number of clients, and with Skyros having lower nil-ext write latency, the same number of clients can potentially push more operations. Three right-most figures show the performance with reads added to the mix.

As usual, we had a presentation in out group:

## Discussion

1) Five nodes. The system is described with a 5-nodes example. It makes sense when we need a supermajority quorum. Three nodes clusters are common and tend to provide better performance for MultiPaxos/Raft systems. However, three nodes setup will require a quorum of all nodes as a supermajority, making the system non-tolerant to any machine slowdowns.

2) Failures? When one node fails, this is not a bit problem. A follower failure is masked by the quorum, and the leader failure will require a new leader election, however, the cluster can continue fine after with just four nodes. Two nodes out of five failings will prevent any supermajority, so the system needs to have a mechanism to detect this efficiently and fall back to regular Multi-Paxos. Timeouts on every operation before fall-back is not a performant option, and without this discussed, the protocol hardly tolerates f nodes failing out of 2f+1 nodes cluster.

3) Orchestrating an overload? The read operations have the potential to cause foreground synchronization when a read tries to access an object with an operation still in the d-log. This means that a workload that writes and immediately reads the same object can cause too many of these transitions to the “foreground.” These explicit syncs not only impact the latency of the reads but is also likely to reduce the ordering batches and cost more processing resources, reducing the system’s maximum capacity. So, by a simple workload, it may be possible to overload the system possible into a failure.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group Special Session: Fast General Purpose Transactions in Apache Cassandra

Modern distributed databases employ leader-based consensus protocols to achieve consistency, entailing certain trade-offs: typically either a scalability bottleneck or weak isolation. Leaderless protocols have been proposed to address these and other shortcomings of leader-based techniques, but these have not yet materialized into production systems.

This paper outlines compromises entailed by existing leaderless protocols versus leader-based protocols and proposes general techniques for addressing them. A new protocol, called Accord, is proposed with optimal failure tolerance that, under reasonable assumptions, achieves optimal latency of two message delays for transactions initiated by coordinators in any region, under any level of competing transactions and maximal tolerated process failures.

Benedict Elliott Smith will present the Accord protocol in our DistSys Reading Group. Benedict is an Apache Cassandra contributor with an interest in performance, correctness, and algorithm design.

When: February 9th 2022 at 2 PM EST (Check your time zone)

# Reading Group. Characterizing and Optimizing Remote Persistent Memory with RDMA and NVM

We have looked at the “Characterizing and Optimizing Remote Persistent Memory with RDMA and NVM” ATC’21 paper. This paper investigates a combination of two promising technologies: Remote Direct Memory Access (RDMA) and Non-Volatile Memory (NVM). We have discussed both of these in our reading group before.

RDMA allows efficient access to the remote server’s memory, often entirely bypassing the remote server’s CPU. NVM is a new non-volatile storage technology. One of the key features of NVM is its ability to be used like DRAM, but with the added benefit of surviving power outages and reboots. Typically, NVM is also faster than traditional storage and gets closer to the latency of DRAM. However, NVM still significantly lags behind DRAM in throughput, and especially in write throughput. The cool thing about NVM is its “Memory Mode,” which essentially makes Optane NVM appear like a ton of RAM in the machine. And here is the nice part — if it acts like RAM, then we can use RDMA to access it remotely. Of course, this comes with a catch — after all, NVM is not actual DRAM, so what works well to optimize RDMA under normal circumstances may not work as well here. This paper presents a handful of RDMA+NVM optimizations from the literature and augments them with a few own observations.

Below I put the table with optimizations summarized in the paper. The table contains the literature optimizations (H1-H5), along with the author’s observations (Parts of H3, H6-H8). One additional aspect of the table is the applicability of optimization to one-sided or two-sided RDMA. One-sided RDMA does not involve a remote CPU. However, two-sided RDAM uses a remote server’s CPU.

H1: Accessing NVM attached to another socket is slow, so it is better to avoid it. We have discussed NUMA a bit in a previous paper to give a hint at what may be the problem here. The paper raises some concerns on the practicality of avoiding cross-socket access, but it also provides some guidance for implementing it.

H2: For two-sided RDMA that involves the host’s CPU, it is better to spread the writes to multiple NVM DIMMs.

H3: Data Direct I/O technology (DDIO) transfers data from the network card to the CPU’s cache. In one-sided RDMA, that will cause the sequential writes to NVM to become Random, impacting performance, especially for large payloads, so it is better to turn DDIO off. However, there are some implications for touching this option, and the paper discusses them in greater detail.

H4: ntstore command bypasses the CPU caches and stores data directly to the NVM. So this can make things a bit faster in two-sided RDMA that already touches the CPU.

H5: This one deals with some hardware specs and suggests using writes that fill an entire 256 bytes XPLine (the data storage granularity). I am not going to explain this any further. However, the paper mentions that this optimization is not always great since padding the entire XPLine (and sending it over the network) incurs a lot of overhead.

H6: For one-sided RDMA, it is more efficient to write at the granularity of PCIe data word (64 bytes) for payloads smaller than XPLine. This is the granularity at which NIC operates over the PCIe bus.

H7: Similar to H6, for two-sided RDMA, write at the granularity of a cache line (64 bytes on x86 architecture).

H8: atomic operations, such as read-modify-write, are expensive, so for best performance, it is better to avoid them.

H9: Doorbell batching helps when checking for persistence. In one-sided RDMA, there is no easy way to check if data has persisted, aside from performing a read. This procedure incurs two round-trips and obviously hurts performance. Doorbell batching allows to send a write and a read at once but delays the read on the remote until the write completes, avoiding two separate round-trips.

The figure below illustrates how the optimization changes the performance when added one at a time. It is worth noting that some optimizations can actually slow the system down, so it is important to understand the specific circumstances of the workload to make the proper decision on using some of the suggestions from this paper.

The paper has a lot more details and discussions of individual optimizations. It is also full of individual evaluations and experiments for different optimizations.

Brian Luger did an awesome presentation of the paper, which is available on YouTube:

## Discussion

1) Only writes. The paper focuses on a bunch of write optimizations, and no read ones. The authors explain this by saying that NVMs read throughput is very high and exceeds the NICs bandwidth, so the network will be a bottleneck for reading. On the write side of things, however, the bottleneck is the NVM itself, and it is not held up by other slower components, making write optimizations very desirable. At the same time, if the network catches up with NVM bandwidth, then we may need to play the same optimizing game for reads.

One thing to mention here is that read-to-write ratios will play a huge role in perceived improvement from optimizations. The paper has not explored this angle, but if the workload is read-dominated (like many database workloads), then all the write optimizations will be left unnoticed for the overall performance. For all the fairness, the paper does use realistic workloads with writes and reads for their evaluation.

2) Vendor-Specific? Currently, there is pretty much one vendor/product out there for NVM — Intel Optane. So a natural question is how many of these optimizations will transfer to other NVM products and vendors.

3) Vendor Benefit. Our final discussion point was about the exposure of RDMA+NVM technology in the cloud. It is still very hard/expensive to get VMs with RDMA support. Having the two technologies available in the public cloud is even less probably now. This means that RDMA+NVM will remain the technology for the “big boys” that run their own data centers. Vendors like Microsoft, AWS, Google, etc. can benefit from this tech in their internal products while making it virtually inaccessible to the general open-source competition.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle

Our 72nd paper was on avoiding coordination as much as possible. We looked at the “Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle” EuroSys’20 paper by Adriana Szekeres, Michael Whittaker, Jialin Li, Naveen Kr. Sharma, Arvind Krishnamurthy, Dan R. K. Ports, Irene Zhang. As the name suggests, this paper discusses coordination-free distributed transaction execution. In short, the idea is simple — if two transactions do not conflict, then we need to execute them without any kind of coordination. And the authors really mean it when they say “any kind.”

In distributed transaction processing, we often think about avoiding excessive contention/coordination between distributed components. If two transactions are independent, we want to run them concurrently without any locks. Some systems, such as Calvin, require coordination between all transactions by relying on an ordering service to avoid expensive locks. Meerkat’s approach is way more optimistic and tries to avoid any coordination, including ordering coordination. Meerkat is based on Optimistic Concurrency Control (OCC) with timestamp ordering to enable independent transactions to run without locks and ordering services. Another innovation for avoiding coordination is the replication of transactions from clients straight to servers. Instead of relying on some centralized replication scheme, like Multi-Paxos or Raft, Meerkat lets clients directly write transactions to replicas, avoiding replication coordination and leader bottlenecks. The authors explored this “unordered replication” idea in their previous paper. The clients also act as the transaction coordinators in the common case.

Meerkat does not stop its coordination avoidance efforts here at the cross-replica coordination. In fact, this is where the most interesting magic starts to happen — the authors designed a good chunk of a system specifically to avoid the cross-core coordination within each server to take advantage of modern multi-core CPUs. The authors call such avoidance of cross-core and cross-replica coordination a Zero Coordination Principle or ZCP for short.

As a motivating example for ZCP and the need for cross-core coordination avoidance, the paper illustrates the contention created by a simple counter shared between threads on one machine. It appears that with the help of modern technologies to alleviate networking bottlenecks (kernel bypass), such a shared counter becomes an issue at just 8 threads. In the example, a simple datastore with a shared counter could not scale past 16 threads, while a similar store without a shared resource had no such problems.

Let’s talk about the protocol now to see how all the coordination-avoidance efforts actually work. The system tolerates $$f$$ failures in the cluster of $$2f+1$$ machines. Each transaction can read and write some set of objects supported by the underlying key-value store. These objects represent the transaction’s read- and write-sets. The replicas maintain two data structures to support transaction processing: a trecord and a vstore.

The trecord is a table containing all transaction information partitioned by the CPU core ID to make each transaction “sticky” to a single core. It manages the transaction state, such as the read- and write-sets, transaction version timestamp, and commit status. The vstore stores versioned key-value pairs. Unlike the trecord, the vstore is shared among all cores at the server. The transaction protocol runs in 3 distinct phases: Execute, Verify, Write. I’m not sure these are the most intuitive names for the phases, but that will do.

In phase-1 (execute), the transaction coordinator (i.e., a client) contacts any replica and reads the keys in its read-set. The replica return versioned values for each key. The coordinator then buffers any pending writes.

The phase-2 (validation) combines the transaction commit protocol with replication of transaction outcome. The coordinator starts phase-2 by first selecting a sticky CPU core that will process the transaction. The sticky core ID ties each transaction to a particular CPU core to reduce inter-node coordination. The coordinator then creates a unique transaction id and a unique timestamp version to use for OCC checks. Finally, the coordinator sends all this transaction information to every replica in a validate message.

Upon receiving the message, each replica creates an entry in its trecord. The entry maintains the transaction’s state and makes this transaction “stick” to the core associated with the core-partitioned trecord. At this point, the replica can validate the transaction using OCC. I will leave the details to the paper, but this is a somewhat standard OCC check. It ensures that both the data read in phase-1 is still current and the data in the write-set has not been replaced yet by a newer transaction. At the end of the check, the replica replies to the coordinator with its local state (OK or ABORT).

The coordinator waits for a supermajority ($$f+\lceil\frac{f}{2}\rceil+1$$) fast-quorum of replies. If it receives enough matching replies, then the transaction can finish right away in the fast path. If the supermajority was in the “OK” state, the transaction commits, and otherwise, it aborts.

Sometimes a supermajority fast-quorum does not exist or does not have matching states, forcing the coordinator into a slow path. In a slow path, the coordinator only needs a majority of replicas to actually reply. If the majority has replied with an “OK” state, the coordinator can prescribe the replicas to accept the transaction, and otherwise, it prescribes the abort action. Once the replicas receive the prescribed transaction state, they mark the transaction accordingly in their trecord and reply to the coordinator. Here, the coordinator again waits for a quorum before finalizing the transaction to commit or abort.

Finally, in phase-3 (write), replicas mark the transaction as committed or aborted. If the transaction is committed, then each replica can apply the writes against the versioned datastore.

Phew, there is a lot to unpack here before diving deeper into the corner cases and things like replica and coordinator failures and recovery. The important parts relate to how the coordination is handled/avoided. To start, the coordinator uses a timestamp for the version, circumventing the need for a counter or centralized sequencer. The transaction replicates with its timestamp directly by the coordinator (who happens to be the client in the normal case) to the replicas, avoiding the need for a centralized replication leader or primary. At the server level, each transaction never changes its execution core even as it goes through different phases. All messages get routed to the core assigned to the transaction, and that core has unique access to the transaction’s trecord partition. This “core stickiness” avoids coordination between the cores of a server(!) for the same transaction. I speculate here a bit, but this may also be good for cache use, especially if designing individual transaction records to fit in a cache line. As a result, the only place the coordination happens between transactions is the OCC validation. During the validation, we must fetch current versions of objects from the core-shared vstore, creating the possibility for cross-core contention between transactions accessing the same data.

I do not want to go too deep into the failure recovery; however, there are a few important points to mention. The replica recovery process assumes replicas rejoin with no prior state, so they are in-memory replicas. The recovery process is leader-full, so we are coordinating a lot here. And finally, the recovery leader halts transaction processing in the cluster. As a result, the recovery of one replica blocks the entire system as it needs to reconcile one global state of the trecord that can be pushed to all replicas in a new epoch. I will leave the details of the recovery procedure to the paper. However, intuitively, this over-coordination in recovery is needed because the normal operation avoided the coordination in the first place. For example, state machine replication protocols “pre-coordinate” the order of all operations. When a replica needs to recover, it can learn the current term to avoid double voting before simply grabbing all committed items from the log available at other nodes and replaying them. It can replay the recovered log while also receiving new updates in the proper order. In Meerkat, we have no single history or log to recover, so a pause to reconcile a consistent state may be needed.

The coordinator recovery is handled by keeping backup coordinators and using Paxos-like consensus protocol to ensure only one coordinator is active at the same time and that the active coordinator is in a proper state.

Now we can talk a bit about the performance. Meerkat significantly outperforms TAPIR, which is the previous system from the same group. The performance gap between the systems is huge. This begs a natural question about the performance. Just how much of Meerkat’s gain is due to a super-optimized implementation and utilization of techniques like kernel-bypass? Meerkat-PB in the figure can shed some light on this, as it represents a version of Meerkat with a dedicated primary for clients to submit transactions. Having a primary adds cross-replica coordination, and despite that, it still significantly outperforms the older systems.

As always, we had a presentation in the group, and it is available in our YouTube channel. This time, Akash Mishra did the presentation:

## Discussion

Quite frankly, I have incorporated quite a bit of discussion into the summary already.

1) Performance. One of the bigger questions was about performance. How much of the “raw” speed is due to the ZCP, and how much of it is due to the enormous implementation expertise and use of kernel bypass and fancy NIC to deliver messages to proper cores/threads. We speculate that a lot of the overall performance is due to these other improvements and not ZCP. That being said, once you have an implementation this efficient/fast, even tiny bits of coordination start to hurt substantially, as evidenced by the motivation examples and even the primary-backup version of Meerkat.

2) Replica Recovery. The blocking nature of recovery may present a real problem in production systems, especially if the recovery time is substantial. It would have been nice to see the recovery evaluations.

3) Performance States. To continue with performance/recovery topics, it appears as the system can operate in multiple very distinct performance states. In the fast-quorum operation, commits come quickly. In the slow path, a whole new round-trip exchange is added (probably after the timeout). This creates distinct latency profiles for fast and slow paths. This can also create distinct throughput profiles, as a slow path sends and receives more messages, potentially creating more load in the system.

4) Sharded systems? Many systems use sharding to isolate coordination into smaller buckets of nodes or replica-sets. For example, Spanner and Cockroach DB do that. Such sharding allows independent transactions to run in separate “coordination pods” without interfering with each other. To scale these systems just need to create more such coordination pods. Of course, in systems like Spanner, transactions that span multiple replica-sets add another level of coordination on top, but the chance that any two shards need to coordinate is kept low by making lots of tiny shards. I wonder about the differences between two philosophies — avoiding coordination vs. embracing it, but in small groups. Are there benefits to “coordination pods”? Should we embrace ZCP? Can ZCP survive the scale of these larger sharded systems?

5) The need for supermajority quorum? Supermajority fast quorums often raise many questions. Just exactly why do we need them? The short answer here is fault-tolerance, or more specifically the ability to recover operations after failures. See, in majority quorum protocols that have a leader, we have at most one operation that can be attempted in the given epoch and log position. This means that if some replicas fail, we can recover the operation if we can guarantee to see the value in at least one replica. Any two majority quorums intersect in at least one replica, making the single operation recoverable as long as it has made it to the majority. Unfortunately, this does not work with leaderless solutions as illustrated by Fast Paxos, as many different values can be attempted in the same epoch and slot position. However, we still need to survive the failures and recover.

Let’s look at the example to illustrate this. Assume we have 5 nodes, 3 of which have accepted value “A” and 2 have value “B.” Let’s assume that we commit “A” at this time since we clearly have a majority agreeing on “A.” If 2 “A” nodes crash, we will have 3 live nodes remaining: “A, B, B.” By looking at these nodes we do not know the fact that value “A” might have been committed by the coordinator. We need a supermajority quorum to survive the failures and recover. If we commit with a supermajority of size $$f+\lceil\frac{f}{2}\rceil+1$$ (4 out of 5 nodes) “A, A, A, A, B”, then failing 2 “A” nodes leaves us with 3 nodes: “A, A, B.” We see more “A” operations here and can recover “A.” In fact, if some value is in a supermajority, then any majority quorum will have the majority of its nodes (i.e. $$\lceil\frac{f}{2}\rceil+1$$ nodes) having that value (but not the vice-versa — the majority value of a majority quorum does not mean the value was in the supermajority).

Now, how does this relate to Meerkat? Each Meerkat transaction has a unique id, so one may think we never have the possibility of committing two or more different transactions for the same id. However, we have to be careful about what Meerkat replicas need to agree upon. It is not the transaction itself, but a transaction status — OK or ABORT. So, we do have two possible values that can exist at different replicas for the same transaction. As a result, Meerkat needs a fast path supermajority quorum to make the transaction status decision recoverable in the replica recovery protocol.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Evolution of Development Priorities in Key-value Stores Serving Large-scale Applications: The RocksDB Experience

On Wednesday, we had our 26th reading group meeting, discussing RocksDB with a help of a recent experience paper: “Evolution of Development Priorities in Key-value Stores Serving Large-scale Applications: The RocksDB Experience.” Single-server key-value storage systems are crucial for so many distributed systems and databases. For distributed folks like myself, these often remain black-boxes that you pick up and use. That is until something in your system starts to crumble peculiarly, and you dive in to investigate…

Anyway, what I want to say is that KV-stores are important. Their performance matters a lot in the world of fast CPUs and fast networks, where every millisecond of slowdown at storage can no longer be “masked” by other “slow” components. This is where this paper takes us — improving the performance, reliability, and feature-set of RocksDB over the years as technology and demands have evolved.

To understand the experiences and lessons of the paper, we first need to look at the underlying technology behind RocksDB. In a nutshell, RocksDB is an LSM-tree (Log-Structured Merge tree) key-value storage. LSM trees have been used in storage for quite some time, as they are relay good for write-intensive workloads. The basic idea of the LSM tree is that data are stored sorted by key. These sorted files are called sorted-strings tables or SSTables for short.

Now, maintaining SSTables requires that data are written to storage sequentially in such a sorted state. Of course, the data does not come pre-sorted to a database, so the system needs to do something else before writing these sorted files to disk. The storage system will keep an in-memory buffer, called memtable, of some relatively large number of updates. This memtable can be represented as some tree structure to allow for efficient insertions. Before each operation is added to a memtable, it is written to a write-ahead-log (WAL) for durability. The WAL reconstructs a memtable in the event of a failure. Once memtable reaches a certain size, it is flushed to disk in a sorted manner. At this time a new empty memtable can start. An important aspect of writing these sorted files is keeping track of their recency order.

When a read request for a key arrives, the system first looks at the memtable to see if data is there. Memory lookup is relatively cheap since no disk access is needed. However, if the requested key is not in the memtable, then we must search on disk, starting from the most recent SSTable segment. Looking up data on a disk can be slow since the system needs to scan a good chunk of a file to find the spot where the key might exist in the sorted list. Naturally, we want to take advantage of the sorted nature of the file. For this, a system maintains a sparse index for each file with the offsets to narrow down the search. Then the system only needs to scan a portion of a file between the two offsets where the key may exist. If the data is missing in the most recent file, then a search continues in the next most recent one and so on. This process results in some peculiar behaviors. For instance, it generally takes less time to find more frequently used data. But it also takes a lot of time to find out that the data is missing entirely. Fishing for non-existent data is a waste of time, so an additional index, a bloom filter, can be used to tell whether the key is guaranteed to be missing.

Another caveat the sequential writes create for us is dealing with old versions of data. See, when we write an SSTable to disk, it is immutable, and when an update or delete to a key comes in, this update will eventually flush to a more recent file. This influx of new data creates a situation where old data that is no longer needed keeps occupying space and potentially increases search time. So the system needs to clean up old data frequently. A compaction procedure mitigates the space amplification by cleaning up old data. It essentially takes multiple files and merges them into one bigger file.

So, my oversimplified descriptions of LSM storage is not necessarily how RocksDB operates, but it should give enough intuition for us to proceed and dive into the lessons and experiences of Facebook engineers working with RocksDB.

Resources: IOPS vs Space vs CPU

The paper starts by exploring resource efficiency and how optimization priorities were changing over time. RocksDB runs best on SSDs, and these storage devices have a limited lifespan bound by the number of write cycles. Naturally, engineers focused on issues of write amplification (the same data rewritten multiple times) to make sure SSDs do not die prematurely. Interestingly enough, the paper almost makes it sound like write-amplification mitigation efforts were largely wasted. The authors state that the workloads used at Facebook are not too IOPS-heavy (does it meant they are not very write-heavy for write-optimized storage?), and storage space was a more pressing concern. Because of this, the engineers have shifted their efforts to the space-amplification problem (a key occupies more space than it needs to, for example, due to having multiple old versions of it).

Another issue brought up is the CPU utilization. Here, again, the paper states that CPUs are rarely a bottleneck. However, to me, it seems like these represent a delicate resource trade-off. For example, to reduce space consumption, we may need to use more aggressive compression that uses more CPU cycles and more aggressive compaction that needs both CPU and IOPs (and increases write-amplification). So I am not sure about the correctness of saying whether some resource here is a bottleneck or not. They all can be a problem, and it seems more about the ability to reach some balance for a given workload and infrastructure. I believe the need to find such balance in different applications is part of the reason behind the multiple compaction strategies mentioned in the paper.

A significant portion of the paper then focuses on dealing with resources at scale. For example, many instances of RocksDB may coexist on one server, requiring resource management to prevent one instance from hogging all the resources. Other resource-related aspects involve the treatment of write-ahead logs (WALs). For example, it is possible to completely turn off RocksDB’s internal WAL to conserve resources. Of course, this leaves the system vulnerable to data loss in the event of a crash, but this may not be a problem if an application using Rocks has its own WAL for things like transactions or replication. An interesting mention for resource management is rate-limiting file deletion. This issue seems a bit specific, but the authors explain how file deletion can be costly and impact other tenants using the same SSD.

Features

The paper also extensively talks about new features and their significance. Similar to how the authors have approached resource efficiency, these features largely stem from operating at scale. Many of the points simply make sense when I read them, but I suspect that these realizations were not as easy in practice and carry some production pain points. For example, we usually expect backward compatibility, but designing forward compatibility, where an older version should be compatible with a newer one, is definitely a result of sleepless nights after unrolling from some newer but buggy version and realizing that data files changed to the point that the old version no longer understands them.

The flexibility of RocksDB is another weaved-in theme of the paper. Since the storage system is used in a variety of applications with a variety of needs, this again makes total sense. It appears that the main goal of many features is to make the system more extensible and fit into many different contexts without creating any roadblocks on purpose. One such example is improvements to configuration management that went from “in-code” configuration to having configuration files. However, one big configuration problem directly stems from the flexibility goal — too many different parameters to tune, and it seems like there is no good solution for this.

The paper presents a few other examples of flexibility features meant to help build apps on top of RocksDB. If implemented, native versioned storage can greatly help systems relying on multi-version concurrency control (MVCC). This, however, may come with a performance penalty. At the same time, MVCC systems have already been relying on RocksDB for storage, since the “no roadblocks” principle provides great flexibility in how keys and values are encoded, allowing versioning information to be a part of the key.

Replication and backup support got their own subsection in the papers, but this is nothing but a trivial “you can copy the files to another machine to start a new replica” approach. This is hardly a feature, but again, it plays nicely with the idea of designing a system with as few roadblocks as possible and letting users/engineers be creative with using it.

Reliability

Reliability is a big topic in the paper. We want the data stored in the database to remain correct and intact. Luckily, there is a very concise summary for this — use checksums! The authors point out that their checksum procedures only work for data already in storage and that they are still working on checking the integrity of data in memtables. This memory corruption may not be that big of a problem though. Thankfully, unlike our personal computers, servers rely on ECC memory that can handle some memory issues all by itself.

I will finish my summary with a large table of features and changes to RocksDB straight from the paper.

And as always, we have our groups presentation by Rohan Puri available on YouTube:

## Discussion

We had a very long discussion after the presentation. I think it lasted almost an hour, just talking about KV-stores in general and RocksDB in particular. There is no way I can possibly summarize every discussion point, but I will try to pick the important ones (by my judgment of their importance)

1) Scratching the surface. This point started in our pre-presentation discussion. While the paper talks about many different features and issues and tries to explain the reasons for the decisions taken, some explanations barely scratch the surface. Of course, it would be rather difficult to talk about eight years of development and go into deep technical discussions. However, what interested the group the most are some rather odd talking points throughout the paper. For example, talking about rate-limiting file deletions is oddly specific. Why not have rate-limiting for all tasks that may have a high impact on IOPS? These oddly specific examples scream about rather interesting back-stories that are obviously missing from the paper.

2) Checksums. The checksum discussion was rather interesting. There are multiple layers of checksums. For example, block checksums make a lot of sense, as they are written when an SSTable block flushes to disk. One observation made in the reading group is that the file checksums were added late in the RocksDB lifecycle. A plausible explanation for this is file checksums are rarely needed, as they would come in handy when, for example, copying the entire SSTable file from one machine to another to start a new replica. And in this hopefully rare occasion, we can check the integrity of the data the long way — open the file and go block by block and check block checksums.

3) Replication. Obviously, RocksDB is a single-server system, but it serves as a store for many replicated systems. In the group, we found it interesting that the paper still talks about replication. However, the replication discussions in the paper boil down to designing the permissive systems that allow to built replicated solutions on top.

4) Too flexible? One of the bigger goals of RocksDB is its flexibility to fit into different applications with different requirements. This creates a system that has too many features, with any application only using a handful of them. However, this ability to tune and have all these features complicates the configuration and management of the system. One notable example is CockroachDB that developed its own in-house replacement for RocksDB with fewer features, and having fewer features seems to be a big bragging point for Cockroach folks.

5) Impact of Facebook hardware infrastructure. One concern raised during the discussion was the impact of hardware infrastructure at Facebook on the overall design trajectory described in the paper. Of course, it is true, that Facebook deploys RocksDB in their systems and their own infrastructure. But it also means that other non-Facebook users have to adjust to decisions made with Facebook-grade infrastructure in mind.

One such example is the write-amplification vs space-amplification discussion in the paper. While Facebook engineers have concluded that on their SSDs (and their workloads), write-amplification does not pose a serious risk of premature SSD failures, the same may not be the case for other users who may have lesser quality SSDs or more write-demanding workloads. It is a serious enough concern that authors acknowledge the existence of LSM-tree solutions with better write-amplification mitigation strategies. Moreover, at least some of these solutions have been put into production use already.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Modeling Paxos Performance in Wide Area – Part 3

Earlier I looked at modeling paxos performance in local networks, however nowadays people (companies) use paxos and its flavors in the wide area as well. Take Google Spanner and CockroachDB as an example. I was naturally curious to expand my performance model into wide area networks as well. Since our lab worked on WAN coordination for quite some time, I knew what to expect from it, but nevertheless I got a few small surprises along the way.

In this post I will look at Paxos over WAN, EPaxos and our wPaxos protocols. I am going to skip most of the explanation of how I arrived to the models, since the models I used are very similar in spirit to the one I created for looking at local area performance. They all rely on queuing theory approximations for processing overheads and k-order statistics for impact of quorum size.

Despite being similar in methods used, modeling protocols designed for WAN operation proved to be more difficult than local area models. This difficulty arises mainly from the myriad of additional parameters I need to account for. For instance, for Paxos in WAN I need to look at latencies between each node in the cluster, since the WAN-networks are not really uniform in inter-region latencies. Going up to EPaxos, I have multiple leaders to model, which means I also must take into consideration the processing overheads each node takes in its role of following other nodes for some slots. wPaxos takes this even further: to model its performance I need to consider access locality and “object stealing” among other things.

Today I will focus only on 5 region models. In particular, I obtained average latencies between 5 AWS regions: Japan (JP), California (CA), Oregon (OR), Virginia (VA) and Ireland (IR). I show these regions and the latencies between them in Figure 1 below.

## Paxos in WAN

Converting paxos model from LAN to WAN is rather straightforward; all I need to do is to modify my paxos model to take non-uniform distances between nodes. I also need the ability to set which node is going to be the leader for my multi-paxos rounds. With these small changes, I can play around with paxos and see how WAN affects it.

Figure 2 (above) shows a model run for 5 nodes in 5 regions (1 node per region). From my previous post, I knew that the maximum throughput of the system does not depend on network latency, so it is reasonable for paxos in WAN to be similar to paxos in local networks in this regard. However, I was a bit surprised to see how flat the latency stays in WAN deployment almost all the way till reaching the saturation point. This makes perfect sense, however, since the WAN RTT dominates the latency and small latency increases due to the queuing costs are largely masked by large network latency. This also may explain why Spanner, CockroachDB and others use paxos in databases; having predictable performance throughout the entire range of load conditions makes it desirable for delivering stable performance to clients and easier for load-balancing efforts.

However, not everything is so peachy here. Geographical placement of the leader node plays a crucial role in determining the latency of the paxos cluster. If the leader node is too far from the majority quorum nodes, it will incur high latency penalty. We see this with Japan and Ireland regions, as they appear far from all other nodes in the system and result in very high operation latency.

## EPaxos

EPaxos protocol tries to address a few shortcoming in paxos. In particular, EPaxos no longer has a single leader node and any node can lead some commands. If commands are independent, then EPaxos can commit them quickly in one phase using a fast quorum. However, if the command have dependencies, EPaxos needs to run another phase on a majority quorum (at which point it pretty much becomes Paxos with two phases for leader election and operation commit). The fast quorum in some cases may be larger than the majority quorum, but in the 5-node model I describe today, the fast quorum is the same as the majority quorum (3 nodes).

Naturally, conflict between commands will impact the performance greatly: with no conflict, all operations can be decided in one phase, while with 100% conflict, all operations need two phases. Since running two phases requires more messages, I had to change the model to factor in the probability of running two phases. Additionally, the model now looks at the performance of every node separately, and account for the node leading some slots and following on the other.

Figures 3 and 4 (above) show EPaxos performance at every node for 2% and 50% conflict. Note that the aggregate throughput of the cluster is a sum of all 5 nodes. For 2% conflict, the max throughput was 2.7 times larger than that of Paxos. As the conflict between commands increases, EPaxos loses its capacity and its maximum throughput decreases, as I illustrate in Figure 5 (below). This changing capacity may make more difficult to use EPaxos in production environments. After all, workload characteristics may fluctuate throughout the system’s lifespan and EPaxos cluster may or may not withstand the workloads of identical intensity (same number of requests/sec), but different conflict.

## wPaxos

wPaxos is our recent flavor of WAN-optimized paxos. Its main premise is to separate the commands for different entities (objects) to different leaders and process these commands geographically close to where the entities are required by clients. Unlike most Paxos flavors, wPaxos needs large cluster, however, thanks to flexoble quorums, each operations only uses a subset of nodes in the cluster. This allows us to achieve both multi-leader capability and low average latency.

wPaxos, however, has lots of configurable parameters that all affect the performance. For instance, the fault tolerance may be reduced to the point where a system does not tolerate a region failure, but can still tolerate failure of nodes within the region. In this scenario (Figure 6, below), wPaxos can achieve the best performance with aggregate throughput across all regions (and 3 nodes per region for a total of 15 nodes) of 153,000 requests per second.

We still observe big differences in latencies due to the geography, as some requests originating in one regions must go through stealing phase or be resolved in another region. However, the average latency for a request is smaller than that of EPaxos or Paxos. Of course, a direct comparison between wPaxos and EPaxos is difficult, as wPaxos (at least in this model configuration) is not as fault tolerant as EPaxos. Also unlike my FPaxos model from last time, wPaxos model also reduces the communication in phase-2 to a phase-2 quorum only. This allows it to take much bigger advantage of flexible quorums than “talk-to-all-nodes” approach. As a result, having more nodes helps wPaxos provide higher throughput than EPaxos.
Some EPaxos problems still show-up in wPaxos. For instance, as the access locality decreases and rate of object migration grows, the maximum throughput a cluster can provide decreases. For instance, Figure 7 (below) shows wPaxos model with locality shrunken to 50% and object migration expanded to 3% of all requests.

## How Good Are the Models?

I was striving to achieve the best model accuracy without going overboard with trying to account all possible variables in the model. The models both for LAN and WAN seems to agree fairly well with the results we observe in our Paxi framework for studying various flavors of consensus.

However, there is always room for improvement, as more parameters can be accounted for to make more accurate models. For instance, WAN RTTs do not really follow a single normal distribution, as a packet can take one of many routes from one region to another (Figure 8, below). This may make real performance fluctuate and “jitter” more compared to a rather idealistic model.

I did not account for some processing overheads as well. In EPaxos, a node must figure out the dependency graph for each request, and for high-conflict workloads these graphs may get large requiring more processing power. My model is simple and assumes this overhead to be negligible.

## Few Concluding Remarks

Over the series of paxos performance modeling posts I looked at various algorithms and parameters that affect their performance. I think it truly helped me understand Paxos a bit better than before doing this work. I showed that network fluctuations have little impact on paxos performance (k-order statistics helps figure this one out). I showed how node’s processing capacity limits the performance (I know this is trivial and obvious), but what is obvious, but still a bit interesting about this is that a paxos node processes roughly half of the messages that do not make a difference anymore. Once the majority quorum is reached, all other messages for a round carry a dead processing weight on the system.

The stability of Paxos compared to other more complicated flavors of paxos (EPaxos, wPaxos) also seems interesting and probably explains why production-grade systems use paxos a lot. Despite having lesser capacity, paxos is very stable, as its latency changes little at levels of throughput. Additionally, The maximum throughput of paxos is not affected by the workload characteristics, such as conflict or locality. This predictability is important for production systems that must plan and allocate resources. It is simply easier to plan for a system delivering stable performance regardless of the workload characteristics.

Geography plays a big role in WAN paxos performance. Despite the cluster having the same maximum throughput, the clients will observe the performance very differently depending on the leader region. Same goes with EPaxos and wPaxos, as different regions have different costs associated with communicating to the quorums, meaning that clients in one region may observe very different latency than their peers in some other regions. I think this may make it more difficult to provide same strong guarantees (SLAs?) regarding the latency of operations to all clients in production systems.

There are still many things one can study with the models, but I will let it be for now. Anyone who is interested in playing around may get the models on GitHub.

# Modeling Paxos Performance – Part 2

In the previous posts I started to explore node-scalability of paxos-style protocols. In this post I will look at processing overheads that I estimate with the help of a queue or a processing pipeline. I show how these overheads cap the performance and affect the latency at different cluster loads.

I look at the scalability for a few reasons. For one, in the age of a cloud 3 or 5 nodes cluster may not be enough to provide good resilience, especially in environments with limited control over the node placement. After all, a good cluster needs to avoid nodes that share common points of failures, such as switches of power supply. Second, I think it helps me learn more about paxos and its flavors and why certain applications chose to use it.  And third, I want to look at more exotic paxos variants and how their performance may be impacted by different factors, such as WAN or flexible quorums. For instance, flexible quorums present the opportunity to make trade-offs between performance and resilience. We do this by adjusting the sizes of quorums for phase-1 and phase-2. This is where the modeling becomes handy, as we can check if a particular quorum or deployment makes a difference from the performance standpoint.

Last time, I looked at how local network variations affect the performance when scaling the cluster up in the number of servers. What I realized is that the fluctuations in message round-trip-time (RTT) can only explain roughly 3% performance degradation going from 3 nodes to 5, compared to 30-35% degradation in our implementation of paxos. We also see that this degradation depends on the quorum size, and for some majority quorum deployments there may even be no difference due to the network. In this post I improve the model further to account for processing bottlenecks.

As a refresher from the previous time, I list some of the parameters and variables I have been using:

• $$l$$ – some local message in a round
• $$r_l$$ – message RTT in local area network
• $$\mu_l$$ – average message RTT in local area network
• $$\sigma_l$$ – standard deviation of message RTT in local area network
• $$N$$ – number of nodes participating in a paxos phase
• $$q$$ – Quorum size. For a majority quorum $$q=\left \lfloor{\frac{N}{2}}\right \rfloor +1$$
• $$m_s$$ – time to serialize a message
• $$m_d$$ – time to deserialize and process a message
• $$\mu_{ms}$$ – average serialization time for a single message
• $$\mu_{md}$$ – average message deserialization time
• $$\sigma_{ms}$$ – standard deviation of message serialization time
• $$\sigma_{md}$$ – standard deviation of message deserialization time

The round latency $$L_r$$ of was estimated by $$L_r = m_s + r_{lq-1} + m_d$$, where $$r_{lq-1}$$ is the RTT + replica processing time for the $$q-1$$th fastest messages $$l_{q-1}$$

## Message Processing Queue

Most performance difference in the above model comes from the network performance fluctuations, given that $$m_s$$, $$m_d$$ and their variances are small compared to network latency. However, handling each message creates significant overheads at the nodes that I did not account for earlier. I visualize the message processing as a queue or a pipeline; if enough compute resources are available, then the message can process immediately, otherwise it has to wait until earlier messages are through and the resources become available. I say that the pipeline is clogged when the messages cannot start processing instantaneously.

The round leader is more prone to clogging, since it needs to process $$N-1$$ replies coming roughly at the same time for each round. For the model purposes, I consider queuing/pipeline costs only at the leader. The pipeline is shared for incoming and outgoing messages.

Lets consider a common FIFO pipeline handling messages from all concurrent rounds and clients. When a message $$l_i$$ enters the pipeline at some time $$t_{ei}$$, it can either process immediately if the pipeline is empty or experience some delay while waiting for the its turn to process.

In the case of empty pipeline, the message exits the queue at time $$t_{fi} = t_{ei} + o$$, where $$o$$ is message processing overhead $$m_s$$ or $$m_d$$ depending on whether the message is outgoing or incoming. However, if there is a message in the queue already, then the processing of $$l_i$$ will stall or clog for some queue waiting time $$w_i$$, thus it will exit the pipeline at time $$t_{fi} = t_{ei} + w_i + o$$. To compute $$w_i$$ we need to know when message $$l_{i-1}$$ is going to leave the queue: $$w_i = t_{fi-1} – t_{ei}$$. In its turn, the exit time $$t_{fi-1}$$ depends of $$w_{i-1}$$, and so we need to compute it first. We can continue to “unroll” the pipeline until we have a message $$l_n$$ without any queue waiting time ($$w_{i-n} = 0$$). We can compute the dequeue time for that message $$l_n$$, which in turns allows us to compute exit time of all following messages. Figure 1 shows different ways a pipeline can get clogged, along with the effects of clog accumulating over time.

Unlike earlier, today I also want to model the overheads of communicating with the clients, since in practice we tend to measure the performance as observed by the clients. This requires the round model to account for client communication latency $$r_c$$ which is one network RTT. Each round also adds a single message deserialization (client’s request) and a message serialization (reply to a client) to the queue.

Let me summarize the parameters and variables we need to model the queuing costs:

• $$r_c$$ – RTT time to communicate with the client
• $$n_p$$ – the number of parallel queues/pipelines. You can roughly think of this as number of cores you wish to give the node.
• $$s_p$$ – pipeline’s service rate (messages per unit of time). $$s_p = \frac{N+2}{N\mu_{md} + 2 \mu_s}$$
• $$w_i$$ – pipeline waiting time for message $$l_i$$
• $$R$$ – throughput in rounds per unit of time.
• $$\mu_{r}$$ – mean delay between rounds. $$\mu_{r} = \frac{1}{R}$$
• $$\sigma_{r}$$ – standard deviation of inter-round delay.

Now lets talk about some these parameters a bit more and how they relate to the model.

Pipeline service rate $$s_p$$ tells how fast a pipeline can process messages. We can get this metric by looking at average latencies of message serialization $$\mu_{ms}$$ and deserialization/processing $$\mu_{md}$$. With $$N$$ nodes in the cluster, we can find an average message overhead of the round $$\mu_{msg}$$. For a given round, the leader node needs to handle 2 message serializations (one to start the round and one to reply back to client and $$N$$ deserializations ($$N-1$$ from followers and one from the client). This communication pattern gives us $$\mu_{msg} = \frac{N\mu_{md}+2\mu_{ms}}{N+2}$$. A reciprocal of $$\mu_{msg}$$ gives us how many messages can be handled by the pipeline per some unit of time: $$s_p = \frac{N+2}{N\mu_{md} + 2\mu_s}$$.

Variable $$w_i$$ tells how backed up the pipeline is at the time of message $$l_i$$. For instance, $$w_i = 0.002 s$$ means that a message $$l_i$$ can start processing only after 0.002 seconds delay. Figure 2 illustrates the round execution model with queue wait overheads.

To properly simulate multi-paxos, I need to look at multiple rounds. Variable $$R$$ defines the throughput I try to push through the cluster, as higher throughput is likely to lead to longer queue wait times. I also need to take into consideration how rounds are distributed in time. On one side of the spectrum, we can perform bursty rounds, where all $$R$$ rounds start at roughly the same time. This will give us the worst round latency, as the pipelines will likely clog more. On the other side, the rounds can be evenly dispersed in time, greatly reducing the competition for pipeline between messages of different rounds. This approach will lead to the best round latency. I have illustrated both of these extremes in round distribution in Figure 3.

However, the maximum throughput $$R_{max}$$ is the same no matter how rounds are spread out, and it is governed only by when the the node reaches the pipeline saturation point: $$R_{max}(N+2) = n_ps_p$$ or $$R_{max}(N+2) = \frac{n_p(N+2)}{N\mu_{md} + 2\mu_{ms}}$$. As such, $$R_{max} = \frac{n_p}{N\mu_{md} + 2\mu_{ms}}$$. In the actual model simulation, the latency is likely to spike up a bit before this theoretical max throughput point, as pipeline gets very congested and keeps delaying messages more and more.

The likely round distribution is probably something more random as different clients interact with the protocol independently of each other, making such perfect round synchronization impossible. For the simulation, I am taking the uniform separation approach and add some variability to it by drawing the round separation times from a normal distribution $$\mathcal{N}(\mu_r, \sigma_r^2)$$. This solution may not be perfect, but normal distribution tend to do fine in modeling many natural random phenomena. I can also control how much different rounds can affect each other by changing the variance $$\sigma_r^2$$. When $$\sigma_r$$ is close to 0, this becomes similar to uniformly spaced rounds, while large values of $$\sigma_r$$ create more “chaos” and conflict between rounds by spreading them more random.

Now I will put all the pieces together. To model the round latency $$L_r$$, I modify the old formula to include the queuing costs and client communication delays. Since the round latency is driven by the time it takes to process message $$l_{q-1}$$, I only concern myself with the queue waiting time $$c_{q-1}$$ for the quorum message. As such, the new formula for round latency is $$L_r = (m_s + r_{lq-1} + c_{q-1} + m_d) + (m_{cd} + m_{cs} + r_c)$$. In this formula, $$m_{cd}$$ is deserialization overhead for the client request, and $$m_{cs}$$ is the serialization overhead for server’s reply back to client.

## Simulation Results

As before, I have a python script that puts the pieces together and simulates multi-paxos runs. There are quite a few parameters to consider in the model, so I will show just a few, but you can grab the code and tinker with it to see how it will behave with different settings. Figure 4 shows the simulation with my default parameters: network settings taken from AWS measurements, pipeline performance taken from the early paxi implementation (now it is much faster). Only one pipeline/queue is used. The distribution of rounds in time is controlled by inter-round spacing $$\mu_r = \frac{1}{R}$$ with $$\sigma_{r} = 2\mu{r}$$.

Next figure (Figure 5) shows how latency changes for inter-round delay variances. The runs with higher standard deviation $$\sigma_r$$ appear more “curvy”, while the runs with more uniform delay do not seem to degrade as quick until almost reaching the saturation point. High $$\sigma_r$$ runs represent more random, uncoordinated interaction with the cluster, which on my opinion is a better representation of what happens in the real world.

## Do I Need to Simulate Paxos Rounds?

The results above simulate many individual rounds by filling the pipeline with messages and computing the queue wait time for each round. Averaging the latencies across all simulated rounds produces the average latency for some given throughput. However, if I can compute the average queue waiting time and the average latency for the quorum message, then I no longer need to simulate individual rounds to essentially obtain these parameters. This will allow me to find the average round latency much quicker without having to repeat round formula computations over and over again.

Let’s start with computing average latency for a quorum message $$r_{lq-1}$$. Since that $$l_{q-1}$$ represents the last message needed to make up the quorum, I can model this message’s latency with some $$k$$th-order statistics sampled from Normal distribution $$\mathcal{N}(\mu_l+\mu_{ms}+\mu_{md}, \sigma_l^2 + \sigma_{ms}^2 + \sigma_{md}^2)$$ on a sample of size $$N-1$$, where $$k=q-1$$. To make things simple, I use Monte Carlo method to approximate this number $$r_{lq-1}$$ fairly quickly and accurately.

Now to approximating the queue wait time $$w_{q-1}$$. This is a bit more involved, but luckily queuing theory provides some easy ways to compute/estimate various parameters for simple queues. I used Marchal’s average waiting time approximation for single queue with generally distributed inter-arrival and service intervals (G/G/1). This approximation allows me to incorporate the inter-round interval and variance from my simulation into the queuing theory model computation.

I will spare the explanation on arriving with the formula for the average round queue wait time (it is pretty straightforward adaptation from here, with service and arrival rates expressed as rounds per second) and just give you the result for a single queue and single worker:

• $$p = R(N\mu_{md} + 2\mu_{ms})$$, where $$p$$ is queue utilization or probability queue is not busy.
• $$C_s^2 = \frac{N^2\sigma_{md}^2 + 2^2\sigma_{ms}^2}{(N\mu_{md} + 2\mu_{ms})^2}$$
• $$C_a^2 = \frac{sigma_r^2}{\mu_r^2}$$
• $$w=\frac{p^2(1+C_s^2)(C_a^2+C_s^2p^2)}{2R(1-p)(1+C_s^2p^2)}$$

With the ability to compute average queue waiting time and average time for message $$l_{q-1}$$ turn around, I can compute the average round latency time for a given throughput quickly without having to simulate multiple rounds to get the average for these parameters. $$L = 2\mu_{ms} + 2\mu_{md} + r_{lq-1} + w + \mu_l$$, where $$r_{lq-1}$$ is the mean RTT for quorum message $$l_{q-1}$$ and $$w$$ is the average queue wait time for given throughput parameters and $$\mu_l$$ is the network RTT for a message exchange with the client.

As such, the average round latency becomes:

$$L = 2\mu_{ms} + 2\mu_{md} + r_{lq-1} + \frac{p^2(1+C_s^2)(C_a^2+C_s^2p^2)}{2R(1-p)(1+C_s^2p^2)} + \mu_l$$

Figure 6 shows the model’s results for latency at various throughputs. The queuing theory model exhibits very similar patterns as the simulation, albeit the simulation seems to degrade quicker at higher throughputs then the model, especially for 3-node cluster. This may due to the fact that the simulation captures the message distribution within each round, while the model looks at the round as one whole.

## Flexible Quorums

I can use both the simulation and the model to show the difference between paxos and flexible paxos (FPaxos) by adjusting the quorums. For instance, I modeled a 9-node deployment of flexible paxos with phase-2 quorum $$q2$$ of 3 nodes. In my setup, flexible paxos must still communicate with all 9 nodes, but it needs to wait for only 2 replies, thus it can finish the phase quicker then the majority quorum. However, as seen in Figure 7, the advantage of smaller quorum is tiny compared to normal majority quorum of 9-node paxos. Despite FPaxos requiring the same number of messages as 5-node paxos setup, the costs of communicating with all 9 nodes do not allow it to get closer in performance event to a 7-machine paxos cluster.

## Conclusion and Next steps

So far I have modeled single-leader paxos variants in the local area network. I showed that network variations have a negligible impact on majority quorum paxos. I also illustrated that it is hard to rip the performance benefits from flexible quorums, since queuing costs of communicating with large cluster become overwhelming. However, not everything is lost for FPaxos, as it  can reduce the number of nodes involved in phase-2 communication from full cluster size to as little as $$|q2|$$ nodes and greatly mitigate the effects of queue waiting time for large clusters.

The simulation and model are available on GitHub, so you can check it out and tinker with parameters to see how the performance may change in response.

There are still quite a few other aspects of paxos that I find interesting and want to model in the future. In particular, I want to look at WAN deployments, multi-leader paxos variants and, of course, our WPaxos protocol that combines multi-leader, WAN and flexible quorums.

# Paxos Performance Modeling – Part 1.5

This post is a quick update/conclusion to the part 1. So, does the network variations make any impact at all? In the earlier simulation I showed some small performance degradation going from 3 to 5 nodes.

The reality is that for paxos, network behavior makes very little difference on scalability, and in some cases no difference at all. To see what I mean, look at the figure below:

See how 4 and 6 and 36 node perform the same in my simulation? And how 5, 7 and even 35 nodes clusters slightly outperforms 4 nodes?

The intuitive high level explanation for even-numbered simulation results is quite simple. For even numbered cluster sizes, a round leader receives an odd number of replies, assuming a self-voting leader. The leader also decides the round after reaching a majority  quorum $$q=\left \lfloor{\frac{N}{2}}\right \rfloor +1$$, meaning that it needs to receive $$q-1$$  or $$\left \lfloor{\frac{N}{2}}\right \rfloor$$messages (with self-voting). As it happens, for even clusters, this message is exactly the median fastest message of the round. For instance, a 6 node cluster leader will receives 5 replies, but the round reaches the majority at the 3rd (or median) reply.

Since the simulation draws message RTTs from a normal distribution, the median (50th  percentile) RTT is also the mean. After repeating it for sufficient number of rounds, any fluctuations are averaged out, resulting in an average round decided by a message with an average RTT for the network.

The cluster with odd number of servers, however, decides on the round at a message with RTT slightly less then the median RTT. This is because we have an even number of replies, and median is computed be averaging two middle RTTs. The smaller of the two values used for computing the median is actually the quorum message for the round.  For instance, in a 7-node deployment, the leader reaches quorum after receiving 3rd message $$l_3$$, with median being $$\frac{l_3+l_4}{2}$$

As a result, after many round repeats (I do ~125000 rounds) the simulation still ends up with an average RTT of a quorum message to be a tiny bit less than the median/mean RTT, and the more nodes I add, the closer it becomes to the actual 50th percentile and the mean.

So, what do we have after all of this? I think it is safe to assume the effects of network variance on paxos performance are very small and sometimes non-existent. We should not worry about the network as much, as long as it is stable and delivers predictable performance.  However, if you have a system with non-majority quorums, you may get slight benefit from quicker replies.

Update (3/10/2018):

• Part 2 – Queuing/Processing overheads

# Do not Blame (only) Network for Your Paxos Scalability Issues. (PPM Part 1)

In the past few months our lab has been doing a lot of work with different flavors of paxos consensus algorithm. Paxos and its numerous flavors are widely used in today’s cloud infrastructure. Distributed systems rely on it for many different tasks to ensure safe operation. For instance, coordination services use some consensus protocol flavor to provide services like leader election, cluster membership, service discovery and metadata management. Databases, such as Spanner or CockroachDB, use paxos to provide fault tolerant-replication of data across nodes or even datacenters.

If you work with Paxos, or had to deal with it at some point, you probably heard that it does not scale well to large number of nodes. ”Five nodes is ideal, do not try more” rhetoric has been repeated many times by many people and it gets engraved into one’s mind. ZooKeeper’s Administrator’s guide mentions 3 and 5 server deployments. Fairly recent epaxos provides evaluation on 3 and 5 node deployments in their paper. Paxos Made Live paper also mentions five nodes as typical Chubby deployment.

But why five servers is such a magical number in the Paxos world? The most common answer is along the lines of ”Increasing the number of servers increases the quorum size, making the paxos round leader wait for more messages to come to reach consensus”. This explanation is straightforward on the surface, after all, waiting for n nodes to reply should take longer than waiting for n − 1. The question then becomes why waiting for more replies is expensive? I thought the answer would be largely network related, but it appears to be more complex.

## Modeling Paxos Performance: First Attempt

To answer this, I tried to model the non-faulty paxos execution time by looking at message exchange between nodes in the cluster. In the nutshell, running a phase of paxos requires one round-trip-time (RTT) between a node and its peers: the node broadcasts the message and waits for the quorum to reply. With network arguably being the slowest part, I can try  to express the paxos phase performance through the communication delays

First, let me define some variables to model a phase of paxos:

• $$r_l$$ – message RTT in local area network
• $$\mu_l$$ – average message RTT in local area network
• $$\sigma_l$$  – standard deviation of message RTT in local area network
• $$N$$ – number of nodes participating in a paxos phase
• $$q$$ – quorum size. For a majority quorum $$q=\left \lfloor{\frac{N}{2}}\right \rfloor +1$$
• $$m_s$$ – time to serialize a message
• $$m_d$$ – time to deserialize and process a message. This involves various message-related round tasks, such as ballot comparisons, log maintenance/updated, etc.

I assume that the network performance, at least in the local area, is normally distributed. Figure 1 shows a normalized histogram (shaded area is equal to 1) of latencies of approximately 2,000 ping requests within an AWS region.

RTT $$r_l$$ for every message is drawn from a normal distribution $$\mathcal{N}(\mu_l+m_s+m_d, \sigma_l^2)$$. This distribution simulates RTTs with additional  static, non-variable delay for serialization and deserialization. As such, the only variability so far is due to the network behavior.

In order to run a round of paxos, a node needs to send $$N − 1$$ messages. That is, a node sends a message to every other node except for itself. I assume a ”leader” is also an acceptor with a short-circuit behavior, where a it does not need to send a vote to itself over a network.

Out of the $$N − 1$$ messages sent, only $$q − 1$$ messages actually matter. Upon receiving $$q − 1$$ successful (once again, assuming non-faulty execution) replies a node has achieved a quorum and can finish the round (Figure 2). We can simulate this by drawing $$N −1$$ random RTTs $$r_{l1}, r_{l2},…, r_{lN-1}$$ from $$\mathcal{N}(\mu_l+m_s+m_d, \sigma_l^2)$$ and sorting them. The $$q − 1$$ fastest message $$r_{lq−1}$$ is the one carrying the last vote to make up a quorum, thus after processing this message, the node no longer needs to wait for other messages to come.

Assuming that a node broadcasts all messages at the same time, message RTT $$r_{lq−1}$$ can then be used to express the latency $$L_r$$ for the entire round: $$L_r = m_s + r_{lq−1} + m_d$$. Figure 3 visually represents the paxos round expressed in the formula.

## Does the Model Make Sense?

With this simple model, I can simulate paxos execution. In multi-paxos optimization where phase-1 is used to primarily pick a stable leader and phase-2 repeats a number of times, the performance of paxos is approximated by just looking at phase-2. If $$m_s$$ and $$m_d$$ are constant, the variability in performance is only due to the network fluctuations.

I created a small python script to run such simulation, as if a single client was interacting with the paxos synchronously one command at a time. Figure 4 illustrates the results with $$m_s = m_d = 0.01 ms$$ and network parameters taken from AWS ping latency figure.

At the first glance, we observe degradation in throughput and increase in latency. However, the performance decreases very slowly, and once I reach 9 nodes, the performance stays almost flat. This is very different from the data we have observed on our actual paxos implementation. Figure 5 shows the throughput and latency from a few concurrent clients interacting with the protocol.

The first thing that catches my attention is the performance degradation between the 3 and 5 node deployments. On the simulation, the difference in throughput was only 2.8%, while real paxos degraded by astonishing 30%.  As the cluster size increases, beyond 5 nodes, the real paxos also appear to degrade quicker.

Clearly, the network variability alone cannot explain the performance hit from increasing the number of nodes. So what is missing from the simple model that greatly impacts the performance as the cluster grows?

## Towards an Improved Model

Starting from the obvious, the per-message serialization $$m_s$$ and processing $$m_d$$ overheads are not static constant parameters, instead they introduce some variance as well. However, $$m_s$$ and $$m_d$$ are small to start with, and making them introduce additional variance to the model will make it better, but it will not change the overall model simulation much.

Assuming that $$m_s$$ is drawn from a normal distribution $$\mathcal{N}(\mu_{ms}, \sigma_{ms}^2)$$ and $$m_d$$ is similarly drawn from $$\mathcal{N}(\mu_{md}, \sigma_{md}^2)$$, then the message RTT time $$r_{l}$$ must be drawn from $$\mathcal{N}(\mu_l+\mu_{ms}+\mu_{md}, \sigma_l^2 + \sigma_{ms}^2 + \sigma_{md}^2)$$. Making this changes to the model resulted in a difference between 3 and 5 node simulations to grow from 2.8% to 2.9% with $$\mu_{ms} = \mu_{md} = 0.01$$ and $$\sigma_{ms} = \sigma_{md} = 0.002$$ (some arbitrary values that make little impact, unless same magnitude as network mean and standard deviation).

Introducing the variability to message overheads $$m_s$$ and $$m_d$$ still does not account for these overheads being dependent on the number of node in the cluster. The dependence on $$N$$ is not direct, and instead the per-message costs increase as the number of messages that a server needs to handle grows. This puts a ”leader” node repeating the phase-2 of paxos in a more vulnerable position as it will experience the increased traffic.

To understand how message serialization and processing depends on the number of messages exchanged, we need to consider the implementation of application’s network layers. Often times, as an application receives the message, it will use one or more processing queues or pipelines to deserialize the message and dispatch it to the appropriate handler for processing.

Let’s consider an example with just one such pipeline. If the queue is empty upon message arrival, it will get to deserialize with no further delay. However, when there are other messages in the queue, the new message has to wait for it turn to be processed.

A single Paxos round has a bursty network utilization, especially at the leader. This is because the leader node first broadcasts a messages and then receives multiple replies at roughly the same time. If the message deserialization has not finished before the next message arrives, then the pipeline gets clogged. Further messages potentially make the issue worse by growing the queue, as shown in Figure \ref{fig:pipeline_clogged}.

Obviously, having multiple parallel pipelines can help, given that they are balanced and there are enough compute resources to run them. However, the problem can also get worse when we start running rounds concurrently (i.e. multiple clients interact with paxos). In such scenario, messages from different rounds will compete for the fixed number of processing queues.

Where does this leave me with trying to model paxos round performance? I need to account for a number of additional factors, such as the number of message-processing queues/pipelines/threads, the rate of message deserialization at each queue, and the number of concurrent paxos rounds. Oh, and since the messages arriving after the quorum has been reached still need to get deserialized, they will contribute to how clogged the pipelines are.

I will pick up from this point onwards in some future post. I will also try to look at flexible quorums and our WPaxos protocol.

Update (3/10/2018):

• Part 1.5 – Does network matter at all?
• Part 2 – Queuing/Processing overheads