Tag Archives: replication

Reading Group. Log-structured Protocols in Delos

For the 87th DistSys paper, we looked at “Log-structured Protocols in Delos” by Mahesh Balakrishnan, Chen Shen, Ahmed Jafri, Suyog Mapara, David Geraghty, Jason Flinn Vidhya Venkat, Ivailo Nedelchev, Santosh Ghosh, Mihir Dharamshi, Jingming Liu, Filip Gruszczynski, Jun Li Rounak Tibrewal, Ali Zaveri, Rajeev Nagar, Ahmed Yossef, Francois Richard, Yee Jiun Song. The paper appeared at SOSP’21. This is a second Delos paper; the first one was “Virtual Consensus in Delos,” and we covered it in the 40th DistSys meeting.

The first Delos paper looked at building the replication/consensus layer to support strongly consistent use-cases on top. The system created a virtual log made from many log pieces or Loglets. Each Loglet is independent of other Loglets and may have a different configuration or even a different implementation. Needless to say, all these differences between Loglets are transparent to the applications relying on the virtual log. 

With the replication/consensus covered by virtual log and Loglets, this new paper focuses on creating modular architecture on top of the virtual log to support different replicated applications with different user requirements. In my mind, the idea of the Log-structured protocol expressed in the paper is all about using this universal log in the most modular and clean way possible. One can build a large application interacting with the log (reading and writing) without too many thoughts into the design and code reusability. After all, we have been building ad-hoc log-based systems all the time! On the Facebook scale, things are different — it is not ideal for every team/product to reinvent the wheel. Instead, having a smarter reusable architecture can go a long way in saving time and money to build better systems.

Anyway, imagine a system that largely communicates through the shared log. Such a system can create new log items or read the existing ones. In a sense, each log item is like a message delivered from one place to one or several other locations. With message transmission handled by the virtual log, the Delos application simply needs to handle encoding and decoding these “log-item-messages.”

Fortunately, we already have a good idea about encoding and decoding messages while providing services along the encoding/decoding path. Of course, I am thinking of common network stacks, such as TCP, or even more broadly, the OSI model. Delos operates in a very similar way, but also with a great focus on the reusability and composability of layers. When a client needs to write some data to the log, it can form its client-specific message and pass it down the stack. Lower layers can do some additional services, such as ensuring session guarantees or batching the messages. Each layer of the stack wraps the message it received with its own headers and information needed for decoding on the consumer side. Delos calls each layer in the stack an engine. The propagation down through layers continues until the message hits the lowest layer in the stack — the base engine. The job of the base engine is to interact with the log system.   

Similarly, when a system reads the message from the log, the log item travels up the stack through all of the same layers/engines, with each engine decoding it and ensuring the properties specific to that engine before passing it up. An imperfect real-world analogy for this process is sending a paper mail. First, you write the letter; this is the highest layer/engine close to the application. Then you put it in the envelope — now the letter is protected from others seeing it. Then goes the stamp — it is ready to be sent, then goes the mailbox — client batching, then post office — server-side batching, then transmission, and then the process starts to get undone from the bottom up. 

Of course, I oversimplified things a bit here, but such message encapsulation is a pretty straightforward abstraction to use. Delos uses it to implement custom Replicated State Machines (RSMs) with different functionality. Building these RSMs requires a bit more functionality than just pushing messages up and down the engines. Luckily, Delos provides a more extensive API with required features. For example, all engines have access to the shared local storage. Also, moving items up or down the stack is not done in a fire-and-forget manner, as responses can flow between engines to know when the call/request gets completed. Furthermore, it is possible to have more than one engine sitting at the same layer. For instance, one engine can be responsible for serializing the data and pushing it down, and another engine can receive the item from an engine below, deserialize and apply it to RSM. The figure illustrates these capabilities.  

This tiered modular approach allows Delos to reuse layers across applications or even compose some layers in different order. So when one application or use-case needs batching, that engineering team does not need to reinvent the batching from scratch. Instead, they can take an existing batching layer and add it to their stack in the appropriate location. The flexibility and reusability allowed Facebook engineers to implement two different control-plane databases with Delos. One datastore, called DelosTable, uses a table API, while another system, called Zelos, implements a ZooKeeper compatible service. 

I think I will stop my summary here. The paper goes into more detail about the history of the project and the rationale for making certain decisions. It also describes many practical aspects of using Delos. The main lesson I learned from this paper is about the overall modular layered design of large RSM-based systems. I think we all know the benefits but may often step aside from following a clean, modular design as the projects get bigger and pressure builds up to deliver faster. But then, what do I know about production-grade systems in academia? Nevertheless, I’d love to see a follow-up paper when more systems are built using Delos. 

As usual, we had our presentation. This time Micah Lerner delivered a concise but very nice explanation of the paper:


1) Architecture. This paper presents a clean and modular architecture. I do not think there is anything majorly new & novel here, so I view this paper more like an experience report on the benefits of good design at a large company. I think there is quite a bit of educational value in this paper. 

In the group, we also discussed the possibility of applying similar modular approaches to more traditional systems. For instance, we looked at MongoDB Raft in the group before. Nothing should preclude a similar design based on a Raft-provided log in a distributed database. In fact, similar benefits can be achieved — multiple client API, optional and configurable batching functionality, etc. That being said, for a system designed with one purpose, it is easy to start designing layers/modules that are more coupled/interconnected and dependent on each other. 

We had another similar observation in the group that mentioned a somewhat similarly designed internal application a while back, but again with a less clear separation between modules/layers.

2) Performance. A performance impact is a natural question to wonder about in such a layered/modular system. The paper spends a bit of time in the evaluation explaining and showing how layers and propagation between them add very little overheads. What is not clear is whether a less generic, more purpose-built solution could have been faster. This is a tough question to answer, as comparing different architectures is not trivial — sometimes it can be hard to tell whether the difference comes from design choices or implementation differences and nuances.

3) Read cost & Virtual Log. This part of the discussion goes back quite a bit to the first Delos paper. With a native Loglet, Delos assumes a quorum-based operation for Loglets, which may have less than ideal read performance. This is because NativeLoglet uses a sequencer to write, but requires on quorum read and waits for reads with the checkTail operation. So a client will read from the quorum, and assuming the Loglet is not sealed (i.e., closed for writes), the client must wait for its knowledge of the globalTail (i.e., globally committed slot) to catch up with the highest locally committed slot it observed earlier. This process is similar to a PQR read! Naturally, it may have higher read latency, which will largely depend on how fast the client’s knowledge of globally committed slot catches up. In the PQR paper, we also describe a few optimizations to cut down on latency, and I wonder if they can apply here. 

Moreover, a client does not need to perform an expensive operation for all reads — if a client is reading something in the past known to exist, it can use a cheaper readNext API, practically allowing a local read from its collocates LogServer. 

4) Engineering costs. This discussion stemmed from the performance discussion. While large companies care a lot about performance and efficiency (even a fraction of % of CPU usage means a lot of money!), the engineering costs matter a lot too. Not having to redo the same things for different products in different teams can translate into a lot of engineering savings! Not to mention this can allow engineers to focus on new features instead of re-writing the same things all over again. Another point is maintenance — cleaner and better-designed systems will likely be cheaper to maintain as well. 

5) Non-control plane applications? The paper talks about using Delos in two control-plane applications. These often have some more specific and unique requirements, such as zero external dependencies, and stronger consistency. The paper also mentions other possible control plane use cases, so it does not appear like Delos is done here. 

At the same time, we were wondering if/how/when Delos can be used outside of the control plane. For Facebook, there may not be too much need for strongly consistent replication for many of their user-facing apps. In fact, it seems like read-your-write consistency is enough for Facebook, so deploying Delos may not be needed. At the same time, user-facing apps can take on more dependencies and external dependencies, achieving some code reuse this way.

Another point made during the discussion is about making a more general and flexible replication framework that can support strongly-consistent cases and higher-throughput weaker consistency applications. We would not be surprised if Delos or its successors will one day support at least some stronger-consistency user-facing applications. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. NrOS: Effective Replication and Sharing in an Operating System

The 77thth paper discussion in our reading group was “NrOS: Effective Replication and Sharing in an Operating System” from OSDI’21. While not a distributed systems paper, it borrows high-level distributed systems ideas (namely, state machine replication) to create a new NUMA-optimized sequential kernel.

See, all modern machines have many CPU cores. OS kernels must be able to utilize that multi-core hardware to achieve good performance. Doing so implies synchronization between threads/cores for access to shared kernel data structures. Typically, OSes go for a fine-grained synchronization to unlock as much concurrency as possible, making kernels very complex and prone to concurrency bugs. To complicate things further, servers often feature multiple CPUs in multiple different sockets. Each CPU has its own memory controller and can “talk” faster and more efficiently to its share of memory connected to that controller while accessing other CPU’s memory requires communication and extra latency. This phenomenon exists in certain single-socket systems as well. The nonuniformity in memory access (hence Non-Uniform Memory Acess or NUMA) creates additional problems and slowdowns when managing locks.

NrOS, with its NRkernel, differs from other kernels in the way it manages concurrency. Instead of a fine-grained approach, NrOS relies on more coarse-grained concurrency access, as it is a simple sequential kernel that protects the data structures with a simple coarse lock. Such coarse-grained locking can be rather bad for high-core systems with lots of contention. However, not all threads/cores compete for this lock for scalability reasons. To start, NRkernel replicates the shared data structures across NUMA nodes. So now cores on different CPUs/NUMA nodes can access a copy of the shared structure from their local memory. There is one little problem, though. We need to make sure all these copies are in sync so that no core/thread can read stale data structure. 

NRKernel solves this using a shared log. All of the copies share a single log structure from which they get their updates. When some thread needs to update a data structure, it tries to acquire a lock to the log to write an update. To reduce the contention of accessing the log, the NRKernel ensures that only one thread per NUMA node can acquire a lock. The kernel achieves that using a flat combining technique where multiple threads can try to access the shared structure, but only one can get the lock. This “lucky” thread becomes the combiner, and in addition to its own access, carries out the operations on behalf of other “not-so-lucky” threads in need of the shared object. Flat combining reduces the lock contention and acts as a batching mechanism where one thread applies operations of all other threads. 

The log helps order all updates to the shared data structures, but we still need to make sure reading the replicas is linearizable. The flat combining approach helps us here as well. If a combiner exists for a replica, then the read-thread looks at the current tail of the log and waits for the existing combiner to finish updating the replica up to that tail position. In a sense, such waiting ensures that the read operation is carried out with the freshest update at the time of the read initiation. If no combiner currently exists, the reading thread gets a lock and becomes the combiner, ensuring the local replica is up-to-date before performing the read. 

The paper goes on to explain some optimizations to this basic approach. For instance, in some cases, despite the batching and relatively few threads competing for exclusive access, the log becomes overwhelmed. In an Apache Kafka topic-esque way, the paper proposes to use multiple parallel logs for certain data structures to reduce the contention. NRkernel uses operation commutativity to place operations that can be reordered into the parallel logs and operations that have dependencies in the same log. This is similar to handling conflicts in numerous distributed systems, where we allow non-conflicting operations to run in parallel but ensure the partial order for dependant commands. 

NrOS uses this style of replication across NUMA nodes to support a variety of subsystems: a virtual memory, an in-memory file system, and a scheduler. The paper evaluates these components against Linux, where it shows better performance, and a handful of other research-level operating systems, like Barrelfish, where it shows comparable or better levels of performance.

This is a second botched presentation in a row where I had to scramble and improvise: 


I will leave this one without a discussion summary. We are not OS experts in this reading group (at least among the people in attendance), and improvised presentation did not facilitate an in-depth exploration of the paper. However, this seemed like a good paper for distributed systems folks to learn about bits of the OS kernel design and make connections. It was also nice to see some real systems (LevelDB) used in the benchmark, although these were tested against an in-memory filesystem, so it is far from a normal use case. A few natural questions arise in terms of the feasibility of the approach in more mainstream OSes. It seems that a huge “philosophical” gap between the NrOS’s solution and how mainstream kernels are designed means that NRkernel is likely to remain an academic exercise.

Reading Group. Avocado: A Secure In-Memory Distributed Storage System

Our 76th reading group meeting covered “Avocado: A Secure In-Memory Distributed Storage System” ATC’21 paper. Unfortunately, the original presenter of the paper could not make it to the discussion, and I had to improvise the presentation on the fly:

So, the Avocado paper builds a distributed in-memory key-value database with a traditional complement of operations: Put, Get, Delete. There is also nothing special on the replication side – the system uses ABD, a very well-known algorithm. What makes this paper special is where in memory this database resides. 

Unlike the traditional systems, Avocado takes advantage of Trusted Execution Environments (TEE) or enclaves. In particular, the system relies on Intel’s version of the TEE. TEEs have some cool tricks up in their sleeves — the memory is so protected that even privileged software, such as an OS or a hypervisor, cannot access it. This protection means that both the data and the code inside the enclave are secured from malicious corruption. 

The paper suggests that these secure environments were not really designed for distributed systems. The problem lies in trust or, more precisely, ensuring that the replicas can trust each other to be correct. Normally, the attestation method would rely on a CPU-vendor service; in this case, it is the Intel attestation service (IAS). IAS would ensure the integrity of code and data in an enclave to the remote party. IAS, however, has some limitations, such as high latency, but it must be used every time a system creates a secure enclave. So Avocado builds its own trust service called configuration and attestation service (CAS). CAS uses IAS for attesting Avacado’s own local attestation service (LAS), and once it is attested, LAS can be used to attest all other enclaves. This kind of builds a longer chain of trust that uses a slow service very infrequently.

So what do we have so far? We can run code and store some information in secure enclaves. Now we need to ensure our code and our data reside in this secure memory. But things are not so easy here, as the enclaves are relatively small — 128 MB in size and have very slow paging capability if more memory is needed. So our goal is to fit the database in 128 MB of RAM. To make things more complicated, the network is still insecure. Avocado runs most of the code from within the enclave to ensure it is tamper-proof. This code includes CAS, replication protocol (ABD), and the networking stack implemented with kernel bypass, allowing Avocado to pretty much place the encrypted and signed messages straight into the network. 

The final piece is a storage component. Naturally, it is hard to store a lot of stuff in 128 MB. Instead of using secure memory for storage, Avocado uses insecure regular host memory for the actual payloads, allowing it to pack more data into one 128 MB enclave. With paging, the system can store even more data, although at some performance cost. The secure tamper-proof enclave only stores metadata about each value, such as a version, checksum, and the pointer to regular host memory with an encrypted value. Naturally, the encryption algorithm is running from within the enclave. Even if someone tries to mess with the value in host memory (i.e., try to replace it), it will be evident from the checksum in the enclave. 

Avocado is a complicated system driven by many limitations of TEEs. Moreover, TEEs significantly hinder the performance, as Avocado running in the secure enclaves is about half as fast as the version running in regular memory. 


1) Attestation service. I think the biggest challenge for our group was understanding the attestation model. The paper lacks some details on this, and the description is a bit confusing at times. For example, when discussing CAS, the authors bring up the LAS abbreviation without first introducing the term. I think it means Local Attestation Service, but I still cannot be sure. Anyway, our group’s confusion may also come from the lack of prior knowledge, and some googling can resolve it.

2) Fault model. The authors motivate and compare Avocado against Byzantine systems. I purposefully did not mention BFT in my summary, as I think the two solve very different problems. While BFT can tolerate several nodes having corrupt data and code, Avocado operates in a different environment. See, Avocado operates in an environment where we do not trust cloud vendors that host our applications. And then, if a cloud vendor can tamper with one node’s data or code, what can prevent them from changing/corrupting all the nodes? If all nodes are corrupt unknown to the user, then the BFT solutions become rather useless. Avocado security lies in the trust that enclaves are unhackable even to cloud vendors/hosts. With the enclave, we are not building a BFT system (i.e., a system that operates even when some nodes are “out of spec), and instead, we have a system that cannot go out of spec as long as we trust the enclaves. Needless to say, we also need to trust our own code. 

3) API model. The system operates on a simple key-value store, that can only put, get, and delete data. While key-value stores are useful as building blocks for many larger systems, these stores often have a more sophisticated API that allows ranged scans. Moreover, an ABD-based system cannot support transactional operations, such as compare-and-set, reducing the utility of this simple kv-store. I wonder, what would it take to make this thing run Multi-Paxos or Raft and add range operations? Is the single-node store a limitation? It uses a skip list, which should be sorted, so I’d suspect range queries should be possible. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Viewstamped Replication Revisited

Our 74th paper was a foundational one — we looked at Viestamped Replication protocol through the lens of the “Viewstamped Replication Revisited” paper. Joran Dirk Greef presented the protocol along with bits of his engineering experience using the protocol in practice.

Viestamped Replication (VR) solves the problem of state machine replication in a crash fault tolerance setting with up to f nodes failing in the cluster of 2f+1 machines. In fact, the protocol (between its two versions, the original and revisited) is super similar to Raft and Multi-Paxos. Of course, the original VR paper was published before both of these other solutions. An important aspect in the narrative of the VR paper is that the basic protocol makes no assumptions about persisting any data to disk. 


In the common case when replicating data, VR has a primary node that prescribes the operations and their order to the followers. The primary needs a majority quorum of followers to ack each operation to consider it committed. Once it reaches the quorum, the primary can tell the followers to commit as well. In VR, followers do not accept out-of-order commands and will not ack an operation without receiving its predecessor. With a few differences, this is how typical consensus-based replication works in both Raft in Multi-Paxos. For example, Multi-Paxos complicates things a bit by allowing out-of-order commits (but not execution to the state machine).

View Change and Replicas Recovery

As with everything in distributed systems, the interesting parts begin when things start to crumble and fail. Like other protocols, VR can mask follower crashes, so the real challenge is handling primary failures. The VR protocol orchestrates the primary failover with a view-change procedure. Here I will focus on the “revisited” version of the view-change. 

One interesting piece of the revisited view-change is its deterministic nature, as each view is assigned/computed to belong to a particular node. When a current primary in view fails, the node for view v+1 will try to become a new primary. This promotion to primary can be started by any node suspecting the current primary’s demise. We do not need to have a node for view v+1 to directly observe the crash of primary for v, allowing for unreliable failure detectors. 

So, any replica can start a view change if it detects a faulty primary. The replica will advance to the next view v+1 and start the view change by sending the StartViewChange message with the new view number to all the nodes. Each replica that receives the StartViewChange will compare the view to its own, and if its view is less than the one in the received StartViewChange message, it will also start the view change and send its own StartViewChange message to every replica. Also note, that two or more replicas may independently start sending the initial StartViewChange messages for the new view v+1 if they independently detect primary failure. The bottom line of this StartViewChange communication pattern is that now any replica that received at least f StartViewChange messages will know that the majority of the cluster is on board to advance with the view change. 

So, after receiving f StartViewChange messages for the new view v+1 from other nodes, each replica sends a DoViewChange message to the primary for that new view v+1. The DoViewChange message contains the replica’s last known view v’ along with the log. The new primary must receive a majority (i.e., f+1) of DoViewChange messages. The new primary then selects the most up-to-date log (i.e., the one with the highest v’ or longest log if multiple DoViewChange messages have the same highest v’). This up-to-date log ensures the new primary can recover any operations that might have been majority-accepted or committed in previous views. Such “learning from the followers and recover” part of the view change procedure is somewhat similar to Multi-Paxos, as the new leader needs to recover any missing commands before proceeding with the new ones. The VR leader recovery only cares about the view and the length of the log because it does not allow out-of-order prepares/commits. In Multi-Paxos, with its out-of-order commitment approach, we kind of need to recover individual log entries instead of the log itself. 

Anyway, after receiving enough (f+1DoViewChange messages and learning the most up-to-date log, the new primary can start the new view by sending the StartView message containing the new view v+1 and the recovered log. The followers will treat any uncommitted entries in the recovered log as “prepares” and ack the primary. After all entries have been recovered, the protocol can transition into the regular mode and start replicating new operations.

Schematic depiction of a View Change procedure initiated by one node observing primary failure. Other machines Send StartViewChange as a response. Eventually, a majority of nodes receive fStartViewChange messages and proceed to DoViewChange, at which point the new primary recovers the log.

There is a bit more stuff going on in the view change procedure. For example, VR ensures the idempotence of client requests. If some client request times out, the client can resend the operation to the cluster, and the new leader will either perform the operation if it has not been done before or return the outcome if the operation was successful at the old primary. 

Another important part of the paper’s recovery and view change discussion involves practical optimizations. For example, recovering a crashed in-memory replica requires learning an entire log, which can be slow. The proposed solution involves asynchronously persisting log and checkpoint to disk to optimize the recovery process. Similarly, view change requires log exchange, which is not practical. An easy solution would involve sending a small log suffix instead since it is likely that the new primary is mostly up-to-date. 


1) Comparison With Other Protocols. I think the comparison topic is where we spent most of the discussion time. It is easy to draw parallels between Multi-Paxos and VR. Similarly, Raft and VR share many things in common, including the similarity of the original VR’s leader election to that of Raft. 

One aspect of the discussion is whether Multi-Paxos, Raft, and VR are really the same algorithms with slightly different “default” implementations. We can make an argument that all these protocols are the same thing. They operate in two phases (the leader election and replication), require quorum intersection between the phases, and have the leader commit operation first before the followers. The differences are how they elect a leader to ensure that the committed or majority-accepted operations from the prior leader do not get lost. Multi-Paxos and VR revisited recover the leader, while Raft ensures the new leader is up to date to eliminate the need for recovery. 

2) Optimizations. Academia and Industry have churned out quite a substantial number of optimization for Multi-Paxos and Raft. There has even been a study on whether the optimizations are interchangeable across protocols (the short answer is yes). But that study did not include VR. Interestingly, Joran mentioned that they applied Protocol-Aware-Recovery to VR, and (if I am not mistaken) they also used flexible quorums. Of course, I am not surprised here, given how similar these solutions are. 

3) In-memory Protocol. Joran stressed the importance of in-memory protocol description to engineers. In their systems, the in-memory consensus allows reducing the reliance on fault-tolerance/reliability of the storage subsystem. It appears that VR is the only major SMR protocol specified without the need for a durable infallible disk. And while people in academia do a lot of in-memory consensuses (including Multi-Paxos and Raft), the aspects of what is needed to make these systems work without the disk are not clearly described. 

The VR paper briefly describes the problem that the lack of durable log introduces. The issue is that a node can forget its entire state. So, it can accept some value, create a majority and then reboot. Upon rebooting, if the node is allowed to participate right away, we no longer have the forgotten value in the majority quorum. Similarly, a node can vote on something in one view, reboot and forget and then potentially revote in a lesser view by some slow old primary. 

To avoid these problems, the node needs to recover before it can participate. At least a partial recovery is needed to avoid the double voting problem — the node needs to learn the current view before accepting new operations. At the same time, such a partially recovered node must still appear “crashed” for any older operations until it has fully recovered. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle

Our 72nd paper was on avoiding coordination as much as possible. We looked at the “Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle” EuroSys’20 paper by Adriana Szekeres, Michael Whittaker, Jialin Li, Naveen Kr. Sharma, Arvind Krishnamurthy, Dan R. K. Ports, Irene Zhang. As the name suggests, this paper discusses coordination-free distributed transaction execution. In short, the idea is simple — if two transactions do not conflict, then we need to execute them without any kind of coordination. And the authors really mean it when they say “any kind.”  

In distributed transaction processing, we often think about avoiding excessive contention/coordination between distributed components. If two transactions are independent, we want to run them concurrently without any locks. Some systems, such as Calvin, require coordination between all transactions by relying on an ordering service to avoid expensive locks. Meerkat’s approach is way more optimistic and tries to avoid any coordination, including ordering coordination. Meerkat is based on Optimistic Concurrency Control (OCC) with timestamp ordering to enable independent transactions to run without locks and ordering services. Another innovation for avoiding coordination is the replication of transactions from clients straight to servers. Instead of relying on some centralized replication scheme, like Multi-Paxos or Raft, Meerkat lets clients directly write transactions to replicas, avoiding replication coordination and leader bottlenecks. The authors explored this “unordered replication” idea in their previous paper. The clients also act as the transaction coordinators in the common case. 

Meerkat does not stop its coordination avoidance efforts here at the cross-replica coordination. In fact, this is where the most interesting magic starts to happen — the authors designed a good chunk of a system specifically to avoid the cross-core coordination within each server to take advantage of modern multi-core CPUs. The authors call such avoidance of cross-core and cross-replica coordination a Zero Coordination Principle or ZCP for short.

As a motivating example for ZCP and the need for cross-core coordination avoidance, the paper illustrates the contention created by a simple counter shared between threads on one machine. It appears that with the help of modern technologies to alleviate networking bottlenecks (kernel bypass), such a shared counter becomes an issue at just 8 threads. In the example, a simple datastore with a shared counter could not scale past 16 threads, while a similar store without a shared resource had no such problems. 

Let’s talk about the protocol now to see how all the coordination-avoidance efforts actually work. The system tolerates \(f\) failures in the cluster of \(2f+1\) machines. Each transaction can read and write some set of objects supported by the underlying key-value store. These objects represent the transaction’s read- and write-sets. The replicas maintain two data structures to support transaction processing: a trecord and a vstore. 

The trecord is a table containing all transaction information partitioned by the CPU core ID to make each transaction “sticky” to a single core. It manages the transaction state, such as the read- and write-sets, transaction version timestamp, and commit status. The vstore stores versioned key-value pairs. Unlike the trecord, the vstore is shared among all cores at the server. The transaction protocol runs in 3 distinct phases: Execute, Verify, Write. I’m not sure these are the most intuitive names for the phases, but that will do.

In phase-1 (execute), the transaction coordinator (i.e., a client) contacts any replica and reads the keys in its read-set. The replica return versioned values for each key. The coordinator then buffers any pending writes.

The phase-2 (validation) combines the transaction commit protocol with replication of transaction outcome. The coordinator starts phase-2 by first selecting a sticky CPU core that will process the transaction. The sticky core ID ties each transaction to a particular CPU core to reduce inter-node coordination. The coordinator then creates a unique transaction id and a unique timestamp version to use for OCC checks. Finally, the coordinator sends all this transaction information to every replica in a validate message. 

Upon receiving the message, each replica creates an entry in its trecord. The entry maintains the transaction’s state and makes this transaction “stick” to the core associated with the core-partitioned trecord. At this point, the replica can validate the transaction using OCC. I will leave the details to the paper, but this is a somewhat standard OCC check. It ensures that both the data read in phase-1 is still current and the data in the write-set has not been replaced yet by a newer transaction. At the end of the check, the replica replies to the coordinator with its local state (OK or ABORT). 

The coordinator waits for a supermajority (\(f+\lceil\frac{f}{2}\rceil+1\)) fast-quorum of replies. If it receives enough matching replies, then the transaction can finish right away in the fast path. If the supermajority was in the “OK” state, the transaction commits, and otherwise, it aborts.

Sometimes a supermajority fast-quorum does not exist or does not have matching states, forcing the coordinator into a slow path. In a slow path, the coordinator only needs a majority of replicas to actually reply. If the majority has replied with an “OK” state, the coordinator can prescribe the replicas to accept the transaction, and otherwise, it prescribes the abort action. Once the replicas receive the prescribed transaction state, they mark the transaction accordingly in their trecord and reply to the coordinator. Here, the coordinator again waits for a quorum before finalizing the transaction to commit or abort. 

Finally, in phase-3 (write), replicas mark the transaction as committed or aborted. If the transaction is committed, then each replica can apply the writes against the versioned datastore. 

Phew, there is a lot to unpack here before diving deeper into the corner cases and things like replica and coordinator failures and recovery. The important parts relate to how the coordination is handled/avoided. To start, the coordinator uses a timestamp for the version, circumventing the need for a counter or centralized sequencer. The transaction replicates with its timestamp directly by the coordinator (who happens to be the client in the normal case) to the replicas, avoiding the need for a centralized replication leader or primary. At the server level, each transaction never changes its execution core even as it goes through different phases. All messages get routed to the core assigned to the transaction, and that core has unique access to the transaction’s trecord partition. This “core stickiness” avoids coordination between the cores of a server(!) for the same transaction. I speculate here a bit, but this may also be good for cache use, especially if designing individual transaction records to fit in a cache line. As a result, the only place the coordination happens between transactions is the OCC validation. During the validation, we must fetch current versions of objects from the core-shared vstore, creating the possibility for cross-core contention between transactions accessing the same data.

I do not want to go too deep into the failure recovery; however, there are a few important points to mention. The replica recovery process assumes replicas rejoin with no prior state, so they are in-memory replicas. The recovery process is leader-full, so we are coordinating a lot here. And finally, the recovery leader halts transaction processing in the cluster. As a result, the recovery of one replica blocks the entire system as it needs to reconcile one global state of the trecord that can be pushed to all replicas in a new epoch. I will leave the details of the recovery procedure to the paper. However, intuitively, this over-coordination in recovery is needed because the normal operation avoided the coordination in the first place. For example, state machine replication protocols “pre-coordinate” the order of all operations. When a replica needs to recover, it can learn the current term to avoid double voting before simply grabbing all committed items from the log available at other nodes and replaying them. It can replay the recovered log while also receiving new updates in the proper order. In Meerkat, we have no single history or log to recover, so a pause to reconcile a consistent state may be needed.  

The coordinator recovery is handled by keeping backup coordinators and using Paxos-like consensus protocol to ensure only one coordinator is active at the same time and that the active coordinator is in a proper state. 

Now we can talk a bit about the performance. Meerkat significantly outperforms TAPIR, which is the previous system from the same group. The performance gap between the systems is huge. This begs a natural question about the performance. Just how much of Meerkat’s gain is due to a super-optimized implementation and utilization of techniques like kernel-bypass? Meerkat-PB in the figure can shed some light on this, as it represents a version of Meerkat with a dedicated primary for clients to submit transactions. Having a primary adds cross-replica coordination, and despite that, it still significantly outperforms the older systems.

As always, we had a presentation in the group, and it is available in our YouTube channel. This time, Akash Mishra did the presentation:


Quite frankly, I have incorporated quite a bit of discussion into the summary already. 

1) Performance. One of the bigger questions was about performance. How much of the “raw” speed is due to the ZCP, and how much of it is due to the enormous implementation expertise and use of kernel bypass and fancy NIC to deliver messages to proper cores/threads. We speculate that a lot of the overall performance is due to these other improvements and not ZCP. That being said, once you have an implementation this efficient/fast, even tiny bits of coordination start to hurt substantially, as evidenced by the motivation examples and even the primary-backup version of Meerkat. 

2) Replica Recovery. The blocking nature of recovery may present a real problem in production systems, especially if the recovery time is substantial. It would have been nice to see the recovery evaluations.

3) Performance States. To continue with performance/recovery topics, it appears as the system can operate in multiple very distinct performance states. In the fast-quorum operation, commits come quickly. In the slow path, a whole new round-trip exchange is added (probably after the timeout). This creates distinct latency profiles for fast and slow paths. This can also create distinct throughput profiles, as a slow path sends and receives more messages, potentially creating more load in the system. 

4) Sharded systems? Many systems use sharding to isolate coordination into smaller buckets of nodes or replica-sets. For example, Spanner and Cockroach DB do that. Such sharding allows independent transactions to run in separate “coordination pods” without interfering with each other. To scale these systems just need to create more such coordination pods. Of course, in systems like Spanner, transactions that span multiple replica-sets add another level of coordination on top, but the chance that any two shards need to coordinate is kept low by making lots of tiny shards. I wonder about the differences between two philosophies — avoiding coordination vs. embracing it, but in small groups. Are there benefits to “coordination pods”? Should we embrace ZCP? Can ZCP survive the scale of these larger sharded systems?

5) The need for supermajority quorum? Supermajority fast quorums often raise many questions. Just exactly why do we need them? The short answer here is fault-tolerance, or more specifically the ability to recover operations after failures. See, in majority quorum protocols that have a leader, we have at most one operation that can be attempted in the given epoch and log position. This means that if some replicas fail, we can recover the operation if we can guarantee to see the value in at least one replica. Any two majority quorums intersect in at least one replica, making the single operation recoverable as long as it has made it to the majority. Unfortunately, this does not work with leaderless solutions as illustrated by Fast Paxos, as many different values can be attempted in the same epoch and slot position. However, we still need to survive the failures and recover. 

Let’s look at the example to illustrate this. Assume we have 5 nodes, 3 of which have accepted value “A” and 2 have value “B.” Let’s assume that we commit “A” at this time since we clearly have a majority agreeing on “A.” If 2 “A” nodes crash, we will have 3 live nodes remaining: “A, B, B.” By looking at these nodes we do not know the fact that value “A” might have been committed by the coordinator. We need a supermajority quorum to survive the failures and recover. If we commit with a supermajority of size \(f+\lceil\frac{f}{2}\rceil+1\) (4 out of 5 nodes) “A, A, A, A, B”, then failing 2 “A” nodes leaves us with 3 nodes: “A, A, B.” We see more “A” operations here and can recover “A.” In fact, if some value is in a supermajority, then any majority quorum will have the majority of its nodes (i.e. \(\lceil\frac{f}{2}\rceil+1\) nodes) having that value (but not the vice-versa — the majority value of a majority quorum does not mean the value was in the supermajority).

Now, how does this relate to Meerkat? Each Meerkat transaction has a unique id, so one may think we never have the possibility of committing two or more different transactions for the same id. However, we have to be careful about what Meerkat replicas need to agree upon. It is not the transaction itself, but a transaction status — OK or ABORT. So, we do have two possible values that can exist at different replicas for the same transaction. As a result, Meerkat needs a fast path supermajority quorum to make the transaction status decision recoverable in the replica recovery protocol.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Fault-Tolerant Replication with Pull-Based Consensus in MongoDB

In the last reading group meeting, we discussed MongoDB‘s replication protocol, as described in the “Fault-Tolerant Replication with Pull-Based Consensus in MongoDB” NSDI’21 paper. Our reading group has a few regular members from MongoDB, and this time around, Siyuan Zhou, one of the paper authors, attended the discussion, so we had a perfect opportunity to ask questions and get answers from people directly involved.

Historically, MongoDB had a primary backup replication scheme, which worked fine most of the time, but had a few unresolved corner-cases, such as having more than one active primary at a time due to the failures and primary churn. Having a split-brain in systems is never a good idea, so engineers looked for a better solution. Luckily, there are many solutions for this problem — an entire family of consensus-based replication protocols, like Multi-Paxos and Raft. MongoDB adopted the latter protocol to drive MongoDB’s update replication scheme. 

At this time, I might have finished the summary, congratulated MongoDB on using Raft, and moved on. After all, Raft is a very well-known protocol, and using it poses very few surprises. However, Mongo DB had a couple of twists in their requirements that prevent using vanilla Raft. See, the original MongoDB primary backup scheme had a pull-based replication approach. Usually, the primary node (i.e., the leader) is active and directly prescribes operations/commands to the followers. This active-leader replication happens in traditional primary backup, in Multi-Paxos, Raft, and pretty much any other replication protocol aside from pub-sub systems that rely on subscribers pulling the data. In MongoDB, the primary is passive, and followers must request the data at some repeated intervals. In the figure below, I illustrate the difference between the push-based (active leader) and pull-based (active follower) approaches.  

In both cases, the followers can learn new data from the leader. The pull-based approach, however, requires a bit more work. The leader has to compute the data batch for each follower individually, because followers may all have slightly different progress due to the differences in polling schedule. Also, there is another problem with the pull solution: the primary has no way of knowing whether the followers have received the operations during the pull. And this is important since the primary needs to count replicas accepting each operation to figure out the quorum and ultimately commit the operations. In traditional Raft, the result of AppendEntry RPC carries an ack for a successful operation, allowing the primary to count successful nodes and figure out when the quorum is satisfied. With flipped pull-style communication, there is no reply to the primary, and so we need to add some additional mechanism to propagate this information, as shown in the figure here. Adding more communication is costly, but the extra message can be piggybacked into the consecutive pull request at the expense of latency.

Pull-based Raft. Note that the leader must know when a quorum has accepted an operation.

Now let’s diverge a bit and discuss why MongoDB picked a pull-based approach despite added complexity/cost. The traditional Pull-based solutions are centered around the primary or leader node, creating a star topology. In fact, unless you use something like Pig primitive from our PigPaxos paper, star topology is pretty much all you can have. This is not very good in some scenarios where links may be congested or cost a lot of money. Consider a WAN scenario where two nodes may exist in another region for geo-redundancy to minimize data loss in the event of regional failure. With star topology, the same payload will travel across WAN twice to each of the two replicas, consequently costing twice as much to replicate. With a pull-based approach, it is easier to orchestrate other replication topologies. All we have to do is to instruct one node to pull data from its regional peer instead of the leader across WAN. Naturally, this creates a longer replication chain but saves the WAN link bandwidth, cost, and potentially some resources at the primary.

MongoDB’s pull-based solution enables such chaining by allowing some non-primary nodes to act as sync sources. Sync source is a node that provides data to syncing servers in response to a pull request. Of course, the primary is always a sync source. The high-level overview of node interactions is a follow:

  • Syncing server asks for new log entries from its sync source.
  • Sync source replies with a sequence of log entries.
  • Syncing server appends the log items if items follow the same history. 
  • If source and server logs diverge, the syncing server may have uncommitted data and must clean the divergent suffix. The detection of log divergence is based on matching the last log entry of syncing server with the first entry of the log segment sent by the source. If they match, then the source has sent a continuation of the same history.
  • Syncing server notifies its sync source of how up-to-date it is, including syncing server’s term.

Unlike vanilla raft, the syncing server does not check the term of the source, so it may be possible for a server to accept log entries from a source in some previous term. Of course, this can only happen if the log entries follow the same history. I think MongoDB Raft does so to make sure that the syncing server learns about legit updates from the previous leader even if it has already participated in the new leader’s election round. What is important here, is that the syncing server sends its higher term back to the source (which should propagate to the source’s source, etc, until it reaches the leader for the source’s term). These term messages act as a rejection for the old leader, so it won’t count the syncing server as accepting the message and being a part of the quorum. As a result, if the data from the old-termed sync source was committed, the syncing server has received it and will eventually receive the commit notification from the new leader. If that data is uncommitted by the old leader (i.e., a split-brain situation), then no harm is done since the syncing server does not contribute to the quorum. The syncing server will eventually learn of proper operations instead.

Now speaking of performance, the paper does not provide any comparison between push- and pull-based solutions, so we are left wondering about the impact of such drastic change. However, some experiments illustrate the comparison of star topology and chained replication in a 2-regions WAN setup. While chaining does not seem to increase the performance (unless the cross-region bandwidth is severely restricted), it predictably lowers the WAN bandwidth requirements. As far as maximum performance, the paper mentions that the limiting factor is handling client requests and not replication, and this is why one fewer server pulling from the leader does not impact throughput. I am not sure I am very comfortable with this explanation, to be honest. 

The paper talks about a handful of other optimizations. I will mention just one that seemed the most interesting — speculative execution. With speculative execution, the nodes do not wait for a commitment notification to apply an operation, and speculatively apply it to the store right away. Since the underlying storage engine is multi-version, the system can still return strongly consistent reads by keeping track of the latest committed version. The multi-version store also allows rollbacks in case some operation fails to commit.

You can see my presentation of the paper on YouTube:


1) Cost of pull-based replication. The performance cost of the pull-based solutions was the first big discussion topic that I raised in my presentation. As I mentioned, there is an extra communication step needed to allow the primary to learn of quorum acceptance. This step either adds additional message round/RPC call or adds latency if piggybacked to the next pull request. Another concern is pulling data when there is no new data, as this will be wasted communication cycles. 

Luckily, we had MongoDB people, including Siyuan Zhou, one of the paper authors, to shed some light on this. To make things a little better and not waste the communication cycles, the pulls have a rather long “shelf life” — if the source has no data to ship, it will hold on to the pull request for up to 5 seconds before replying with a blank batch. 

Another big optimization in the MongoDB system is a gradual shift to the push-based style! This somewhat makes the entire premise of the paper obsolete, however, this new “push-style” replication is still initiated by the syncing server with a pull, but after the initial pull, the source can push the data to the syncing server as it becomes available. So this allows building these complicated replication topologies while reducing the latency impact of a purely pull-based solution.

Another aspect of cost is the monetary cost, and this is where chained replication topologies can help a lot. Apparently, this was a big ask from clients initially and shaped a lot of the system architecture. 

2) Evolution vs Revolution. So, since the original unproven replication approach was pull-based to allow chained topologies, the new improved and model-checked solution had to evolve from the original replication. One might think that it would have been easier to slap a regular push-based Raft, but that would have been a drastic change for all other components (not to mention the features). This would have required a lot more engineering effort than trying to reuse as much of the existing code as possible. This brings an interesting point on how production software gradually evolves and almost never drastically revolves.

3) Evaluation. The evaluation is the biggest problem of the paper. It lacks any comparison with other replication approaches except old MongoDB’s primary backup scheme. This makes it hard to judge the impacts of the changes. Of course, as we have seen from the discussion with the authors, the actual implementation is even more complicated and evolved. It tries to bridge the gap between pull and push replication styles, so a clear comparison based on MongoDB’s implementation may not have been possible at all. 

That being said, I had some questions even about the provided self-comparisons. See, I would have expected to see a bit of throughput increase from chaining, similar to what I observe in PigPaxos, but there is none. The paper explains it by saying that replication at the sync source takes only 5% of a single core per syncing server, which would amount to just 20% of a core in star topology leader. Roughly, given the VMs used, this is around 5% of the total CPU used on replication, with the paper claiming that all remaining CPU is used to handle client requests. Assuming there is sync every 2 ms, we have about 2000 syncs per second at the leader for 12000 client requests. Doing some napkin math, we can see that there are 6 times more requests than replications per second, yet requests use 19 times the CPU, making each request roughly 3 times more expensive than each replication. Given that replication messages are not super cheap and require querying the log, and serializing the data, the performance cost difference sounds excessive to me, despite considering that client requests have a few writes in them (i.e., write to the log, and operation execution).

Siyuan explained that there is a bit more work on the request path as well. Not only the log is written (and persisted for durability), but there are also some additional writes for session maintenance and indexes. Moreover, the writes in the underlying WiredTiger engine are transactional, costing some performance as well. 

4) PigPaxos? Throwing this out here, but there are push-based solutions that can have more interesting replication topologies than a star. Our PigPaxos, in fact, solves a similar issue of cross-regional replication at a low network cost. 

5) Other pull-based systems. Finally, we try to look at other solutions that may use pull-based replication, but there are not many. Pub-Sub systems fit the pull model naturally as subscribers consume the data by pulling it from the queue/log/topic. Pull-based replication can be handy in disaster recovery when nodes try to catch up to the current state and must ask for updates/changes starting some cursor or point in history. 

6) Reliability/Safety. As the protocol makes one important change of not rejecting the data from the old-termed source, it is important to look at the safety of this change. The paper claims to model the protocol with TLA+ and model checking it. Intuitively, however, we know that even though the node takes the data from the old source, it actively rejects it by sending its higher term. This, intuitively, should be enough to ensure that the old leader does not reach a quorum of accepts (even though there can be a majority of nodes that copied the data) and does not reply to a client of commitment. The rest is taken care of by Raft’s commit procedure upon the term change.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Scalable but Wasteful or Why Fast Replication Protocols are Actually Slow

In the last decade or so, quite a few new state machine replication protocols emerged in the literature and the internet. I am “guilty” of this myself, with the PigPaxos appearing in this year’s SIGMOD and the PQR paper at HotStorage’19. There are better-known examples as well — EPaxos inspired a lot of development in this area. However, there seems to be a big disconnect between literature and production. While many sophisticated protocols, such as EPaxos, Atlas, SDPaxos, Comprmentalized Paxos, appear in the literature, and some of them are rather popular in the academic community, the production systems tend to rely on more conservative approaches based on Raft, Multi-Paxos, or primary-backup.

There are probably a few reasons for this disconnect. For instance, as we discussed in Paxos vs Raft reading group meeting, explaining protocols in terminology closer to actual code and having good reliable reference implementations help with adoption. However, the difficulty of implementing some technology should not be a roadblock when this technology offers substantial improvements. 

Well, it appears that there are substantial quantitative improvements. Consider EPaxos. The protocol presents a leader-less solution, where any node can become an opportunistic coordinator for an operation. The exciting part is that EPaxos can detect conflicts, and when operations do not conflict with each other, both of them can succeed in just one round trip time to a fast quorum. The original paper claims better throughput than Multi-Paxos, and this is not surprising, as EPaxos avoids a single-leader bottleneck of Multi-Paxos and allows non-conflicting operations to proceed concurrently. This is a big deal! 

Throughput of Multi-Paxos and EPaxos in 5 dedicated nodes cluster under low conflict.

I wanted to find the truth here, so I and a couple of colleagues worked on experimentally testing Multi-Paxos and EPaxos. We performed a simple experiment, shown in the figure here, to confirm EPaxos’ performance advantage, and we achieved somewhere between 15-25% better throughput in 5 nodes EPaxos than Multi-Paxos when running on identical VMs (EC2 m5a.large with 2vCPUs and 8 GiB of RAM). Original EPaxos paper has a bit larger performance difference in some situations, but we have spent significantly more time on our Multi-Paxos implementation and optimizations than EPaxos. Anyway, we see a substantial improvement that was stable, reproducible, and quite large to make a real-world difference. Not to mention that EPaxos has a few other advantages, such as lower latency in geo-distributed deployments. So what gives? 

I have a theory here. I think many of these “faster” protocols are often slower in real life. This has to do with how the protocols were designed and evaluated in the first place. So bear with me and let me explain. 

Most evaluations of these consensus-based replication protocol papers are conducted in some sort of dedicated environment, be it bare-metal servers in the lab or VMs in the cloud. These environments have fixed allocated resources, and to improve the performance we ideally want to maximize the resource usage of these dedicated machines. Consider Multi-Paxos or Raft. These protocols are skewed towards the leader, causing the leader to do disproportionately more work than the followers. So if we deploy Multi-Paxos in five identical VMs, one will be used a lot more than the remaining four, essentially leaving unused resources on the table. EPaxos, by design, avoids the leader bottleneck and harvests all the resources at all nodes. So naturally, EPaxos outperforms Multi-Paxos by using the resources Multi-Paxos cannot get a hold of ue to its design. Such uniform resource usage across nodes is rather desirable, as long as avoiding the leader bottleneck and allowing each node to participate equally comes cheaply. 

Aggregate CPU utilization for Multi-Paxos and EPaxos in 5 nodes

Unfortunately, it is not the case, and EPaxos’ ability to use resources that would have been left idle by Multi-Paxos comes at a steep penalty! Intuitively, EPaxos needs to do a lot more work to maintain safety and remain leader-less — it needs to communicate the dependencies, compute dependency graphs, check for conflicts in these graphs, and resolve the conflicts as needed. This added complexity contrasts with Multi-Paxos that provides most of its safety via a few simple comparisons both at the leader and followers. To try to estimate the cost of EPaxos’ role uniformity, we need to look at resource usage. In the same experimental run as in the absolute performance figure earlier, we have measured CPU usage across all VMs. Expectedly, we were able to observe that EPaxos is very good at using all available resources, and Multi-Paxos consumes the entire CPU only at one node – the leader. However, it does not take long to spot something odd if we start looking at the aggregate resource usage, shown in the figure here. It turns out that Multi-Paxos achieved nearly 14k ops/s of throughput consuming roughly 200% CPU (and leaving 300% unused), while EPaxos did a bit over 18k ops/s with all 500% of aggregate CPU utilization. 

Throughput normalized by the CPU usage

This suggests that EPaxos needs overall more CPU cycles across all nodes to finish each operation. We can see this better if we normalize the throughput per the amount of CPU consumed. As seen in the figure, this normalized throughput presents a big gap in the efficiency of protocols. Of course, this gap may vary and will depend on the implementation. It may even shrink if some better engineers implement EPaxos, but I also think there is a fundamental issue here. EPaxos and many other protocols were designed to take advantage of all allocated resources in the cluster since allocated but unused resources are wasted. This resource usage paradigm is common when we have dedicated servers or VMs. However, technology has moved on, and we rarely deploy replicated services or systems all by themselves in isolation. With the advances in virtualization and containerization, we increasingly deploy replicated systems in resource-shared environments that are designed to pack applications across clusters of servers in a way to avoid having idle physical resources. 

Aggregate throughput of 5 Multi-Paxos and EPaxos instances in a task-packed 5 nodes cluster.

Moreover, most replicated systems are sharded, so we normally have multiple instances of the replication protocol supporting shards of one larger system. This makes task-packing simpler as we have many relatively uniform tasks to schedule between machines. Consider Yugabyte DB for example. Karthik Ranganathan in his talk described how the system schedules raft groups across a cluster such that some server has a few resource-heavy raft leaders and plenty more resource-light raft followers. This type of packing allows Yugabyte (and Cockroach DB, Spanner, Cosmos DB, and so on) to achieve uniform resource usage on VMs or dedicated servers while having non-uniform, but efficient replication protocols. Here I show how 5 instances of Multi-Paxos compare with 5 instances of EPaxos when deployed on larger servers (32 GiB RAM and 8 vCPUs). This drastic difference compared to the dedicated environment is due to the fact that we were able to pack multiple instances of Multi-Paxos to consume all the resources of the cluster, and clock-for-clock, EPaxos has no chance of winning due to its lower efficiency. 

Ok, so I think there are quite a few things to unpack here. 

  • Absolute performance as measured on dedicated VMs or bare-metal servers is not necessarily a good measure of performance in real-life, especially in resource-shared setting (aka the cloud)
  • We need to consider efficiency when evaluating protocols to have a better understanding of how well the protocol may behave in the resource-shared, task-packed settings
  • The efficiency of protocols is largely under-studied, as most evaluations from academic literature simply focus on absolute performance.
  • The efficiency (or the lack of it) may be the reason why protocols popular in academia stay in academia and do not get wide adoption in the industry. 

In conclusion, I want to stress that I am not picking on EPaxos. It is a great protocol that has inspired a lot of innovation in the area distributed consensus. Moreover, I am sure there are use cases for EPaxos (or more recent extensions, such as Atlas), especially in the WAN setting when trading of some efficiency for better latency may be acceptable. Many other protocols (mine included) may have similar efficiency problems, as they approach the performance issue similarly – find the bottlenecked node and move its work elsewhere. The fundamental issues at play here are that (1) moving the work elsewhere comes at an efficiency penalty, and (2) in many modern environments there may not be any resource available for such “moved” work due to resource budgets and tight task-packing.

I, Abutalib Aghayev, and Venkata Swaroop Matte discuss the efficiency issues in a bit more detail in our upcoming HotStorage’21 paper: “Scalable but Wasteful: Current State of Replication in the Cloud.”

Reading Group. Strong and Efficient Consistency with Consistency-Aware Durability

In the 62nd reading group session, we covered the “Strong and Efficient Consistency with Consistency-Aware Durability” paper from FAST’20. Jesse did an excellent presentation for the group that explains the core of the paper rather well:

This paper describes a problem with many leader-based replication protocols. It specifically focuses on ZooKeper and Zab, but similar issues arise in Multi-Paxos, and Raft systems as well. Linearizable reads are expensive, as they need to go through a lease-protected leader (or carry out a full consensus round in consensus-based replication if leases are to be avoided). This puts a single leader node in the middle of everything, which is bad for throughput. 

But what if we allow reading from followers? Systems like ZooKeeper allow such reads, but these have a few problems. For one, followers may be stale, so such reads do not have the same recency guarantees as the linearizable solutions. Secondly, reading from followers violates monotonic reads if the client is to jump between the followers it reads from without having a special session/recency token. The third problem, the one the paper is specifically solving, is that there is no monotonicity between clients — when one client reads a value of version n, nothing prevents another client at a later time to read version n-1 from a different node. 

So, the goal of the paper is to allow reading from followers while sacrificing as little consistency as possible. The system introduced in the paper, called ORCA, solves two consistency-related problems. First, it ensures that local followers return to clients only durable data that cannot be lost in the face of failures. Second, ORCA enforces some recency guarantees in the form of cross-client monotonic reads.

To achieve both goals, the system introduces a globally replicated durable-index. The purpose of this index is to let the nodes know up to which point in the log the data has been durably (here durability means persistence to some non-volatile storage) replicated to some quorum of nodes (traditionally, this would be a majority quorum). This durability marker allows the followers to avoid returning a value that can be lost due to a crash. For example, consider some object k with the initial value k1 and a corresponding durable-index=1. Another value k2 is being replicated in log position 2. If a local node returns k2 before it is durable, there is a chance that k2 was not replicated and persisted at the majority of nodes yet, and a minority partition or failure can erase k2. To prevent this, the leader maintains a durable-index that gets updated once the quorum of replicas acknowledged the persistence of data to storage. In the example, once the k2 is durable and followers acked, the leader updates its durable-index:=2, and sends it to the followers. A follower can only return a value k2 if it has received a durable-index>=2 (i.e the index that has moved to or past the value k2). Naturally, there are plenty of situations when the follower may be aware of k2 but has not received the durable-index=2 update just yet. In this case, the follower cannot return the data just yet. It cannot return k2, since it is not durable yet, but it also cannot return k1, as some other follower may have already returned k2 to a client. One solution would be to wait out and see whether the durable-index eventually catches up. A more proactive approach would forward the read to the leader, and let the leader synchronously replicate k2 (and tell followers to flush it to disk) before returning the value to the client. The paper favors the proactive approach, as it acts somewhat as “read-repair” applicable to a variety of leader-based replication schemes. 

The paper presents this durable-index as a big contribution, although I am a bit skeptical about this. See, existing systems have been doing the whole durable-index business for a very long time. I will focus on just one example, however — Raft. In Raft, the leader must communicate the leader’s commitIndex with the followers after a quorum has been reached so that all nodes can mark all log items or updates up to commitIndex as committed. If we assume that Raft persists operations to disk at each node upon receiving them, then the leader’s commitIndex in the “commit” message (i.e next AppendEntry RPC) is the same as durable-index from this paper. A similar global commit progress marker can be implemented in Paxos-style systems despite a few challenges with out-of-order commits in Multi-Paxos. 

For paper’s defense, it makes the durable-index a bit more general than Raft’s approach. For example, the paper allows for more relaxed primary backup implementations, asynchronous replication, and implementations with delayed disk persistence. Consider adding durable-index to an asynchronous leader-driven system to allow durable reads. The system can check whether the value to be returned is durable, allowing it to do synchronous “read-repair” only when needed, and preserving the benefits of async replication as often as possible.

Ok, so we have a durable-index that marks the global progress of the system, and the leader distributes the durable-index to all nodes. This is enough to ensure the data read once won’t ever get lost due to a failure, but this is still not enough to ensure monotonicity. If one server in the cluster of 3 is slow, it may not have received some updates and the new durable-index as 2 other faster nodes have successfully applied some more operations. This can lead to the slow node returning older data than the other two nodes may have already returned. Similar can happen when one replica is network partitioned. For a single client, this monotonicity problem can be solved with some sort of session token to make recency of data observed by the client, but there is no elegant solution for the cross-client case. 

The naive solution to the monotonicity problem, which Jesse describes in great detail in the presentation, expands the durable-index requirement to all nodes. In this case, a follower can only return a value when its state matches the durable state agreed upon by all nodes, so there can be no slow or partitioned replicas. For instance, we may have 3 nodes, all having a persisted history of k1, k2, k3. Since all nodes have the value k3, then it is clearly durable. A leader node can then send the durable-index=3 to signal the rest of the cluster that a 3rd item (i.e. k3) is durable. Let’s say one follower f1 has received this durable-index. We can allow this replica to return k3 locally. At the same time, other follower f2 may not have received the durable-index update just yet, so it is aware of k3 but does not think it is durable. The node f2 won’t return k2 or k3 and instead will forward the read to the leader, which will ensure that k3 is durable and no other operation is in progress before returning k3.

Requiring durability/replication to all nodes has been explored before. If I continue to draw parallels with Raft and take into account the flexible quorums result, then configuring Raft with a replication phase requiring a quorum of all nodes will achieve this durable-index of all nodes approach. Needless to say that Raft will provide linearizability and not just monotonic reads (as long as followers block in case of a dirty read until they receive updated commitIndex from the leader). There is an obvious problem with the quorum of all nodes crippling the availability when one node goes down, however, there are also examples of production systems that make this work to some degree.

Anyway, ORCA does not require durability on all nodes due to availability issues. Instead, it introduces the “active-set,” a set of nodes that must achieve durability for the durable-index to progress. Reads are possible against the replicas in the active-set, while reads against nodes outside of the active-set are prohibited (forwarded to leader?). The active-set must be at least the majority of nodes for fault tolerance reasons. The lease protects the active-set to allow failed or partitioned nodes to fall out of the set safely. This idea is largely described in Paxos Quorum Leases paper, which, again, provides linearizability. 


1) Linearizable Alternatives

Flexible Quorums in Write All Read One configuration. I already touched on this point in the summary, but I also raised it in our group’s discussion. One thing that is different between just running Raft with write quorum of all nodes and ORCA is that ORCA can do “fast writes” since it is not doing linearizability. These fast writes are just uncommitted writes when clients receive a write acknowledgment before an operation has been quorum-committed. Some systems, like MongoDB, also have them, and they seem to cause more confusion and problems than benefits. So, we do not think this is really a good feature to have in a system with stronger consistency guarantees. In fact, this consistency relaxation seems to be the only big difference between Flexible-Paxos (Raft) approach and the naive ZooKeeper-based ORCA. 

Paxos Quorum Leases. Paxos Quorum Leases (PQL) paper was brought up during the discussion too. At the high level, PQL is similar to lease-protected active-sets of ORCA. However, we were surprised to see a complete lack of comparison between ORCA’s active-set approach and PQL. 

Paxos Quorum Read. The problem of scaling read throughput in strongly consistent systems has been explored by me before. Paxos Quorum Read (PQR) approach allows client-driven reads from a quorum of followers. This is not as scalable as reading from just one node, but PQR avoids any leases and sticks to linearizability. 

2) Safety of durable-index

Lease safety. As with all systems that rely on leases, proper timeouts are essential for safety. Lots of care must be put into establishing proper timeouts. For example, it is essential to have partitioned nodes in active-set to timeout and “kill themselves” before the leader has a chance to timeout and pick a new active-set. If a leader creates a new active-set before the bad node in the old one dies, we may have a situation with two nodes returning different data. Timeouts also need properly working clocks. If one clock ticks too quickly or too slowly, this may also be a problem. 

Replication safety. An interesting point was brought up regarding the safety of durable-index. Since the durable-index update may get delivered faster to some nodes and slower (or be dropped altogether) to others, this creates a situation when a client can read from a more up-to-date node and receive some newer data, let’s say value k3. A client may then contact a different node that has not received the new durable-index and still thinks k2 is durable. This node cannot return k2as it would be a violation of monotonic reads! The node must wait for k3 to become durable, which in ORCA means that the read gets forwarded to the leader. So in other words, if a node knows of some non-durable update to an object, it is prettymuch blocked, and needs to forward to the leader to avoid safety problems. This leads to the next discussion point.

3) Performance cost. The safety of the protocol relies on fixing the durability while doing a read operation. So every time a read is done against the node that knows of some newer update that is not durable yet, the protocol forwards to the leader and potentially re-replicates. Even the forwarding part alone can be costly. It will cost not only extra latency but also the resources needed to support these multi-hop operations. 

ORCA is banking on the fact that we have a system with multiple keys/objects and that the access patterns are such that it is unlikely for us to both have an outstanding non-durable update and a read operation for the same key at the same time. This is generally an ok assumption, and many systems do that. The problem is that the opposite case is a complete disaster for the system. Having just a handful of “hot keys” will cause frequent forwards to the leader and all the extra work and extra latency. At the very best this will create bad latency, but because this creates a lot more work for the system, it actually lowers the maximum throughput achievable in the system. Can this translate to a metastable failure?

4) Practicality of cross-client monotonic reads. The final discussion topic I want to mention is the practicality of such cross-client monotonic read consistency. The paper goes into a few examples. However, monotonic reads are a far cry from more stringent consistency models. ORCA does not explicitly enforce read-your-write property, which, intuitively, is desirable in a system claiming strong consistency. For instance, Facebook goes out of its way to provide read-your-write consistency across its stack of weakly consistent systems to make development easier and prevent user confusion. 

As a last remark, I want to add that consistency properties with the paper’s approach will depend on the underlying leader-based protocol and implementation. As I mentioned a few times, Raft already has durable-index backed in, and solutions like PQL can provide something similar to the active-set described here for a completely linearizable system. However, the durable-index and active-set approach can be applied to weaker-consistency leader-based replication, and in such cases, this can improve consistency, hopefully at a low-enough cost. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Near-Optimal Latency Versus Cost Tradeoffs in Geo-Distributed Storage

Short Summary

pando_delegatesYesterday we discussed Pando, a geo-replication system achieving near-optimal latency-cost tradeoff in storage systems. Pando uses large Flexible Paxos deployments and erasure coding to do its magic. Pando relies on having many storage sites to locate sites closer to users. It then uses Flexible Paxos to optimize read and write quorums to have the best latency for a particular workload. Finally, erasure reduces the cost associated with having many storage sites. There are a few caveats to the paper. First is the slowness of the (Flexible) Paxos protocol, as it requires two phases to complete an operation. Pando addresses this issue with a cool use of delegate nodes — one of the nodes from phase-1 quorums acts as a delegate on behalf of the client-proposer to collect phase-1 acks and start phase-2. This can save significant latency, getting the full two-rounds in geo deployments closer to the one round RTT of a regular proposer-driven Paxos. The second problem is the quorum overlaps when using erasure coding. If the data is split into parts, we need to ensure the k-nodes overlap between the read and write quorums, making erasure coding detrimental to achieving low latency. This k-nodes overlap is needed to restore the latest version of the data, as having fewer than k-nodes with the latest version is not enough to reconstruct the data. Pando, however, first tries to read from a smaller quorum that guarantees just one node overlap and hopes that there will be enough nodes that have received the update by the time a read is issued. If there are no k nodes to reconstruct the data, Pando continues waiting for more nodes to respond to get to that large quorum with k-nodes overlap. 


The presentation video:


1) ConfigManager. ConfigManager is a component of Pando that computes desirable quorums and delegates for each key and key’s access pattern. We felt like this ConfigManager is very important for the performance of Pando, and it was rather under-specified in the paper. For example, we were wondering if it is possible to make the ConfigManager dynamic and make it learn the workload paters and adjust as needed. 

2) Read consistency. Pando uses quorum reads while using Paxos for writes. Naively reading from a quorum is dangerous and can lead to linearizability problems, yet the reads are not discussed in detail in the paper. The paper briefly in one sentence mentions that to enable quorum reads a global acknowledgment is needed to make sure all nodes know a value has been committed. But Pando does not explain how such commitment works and does not study the performance consequences of this. For example, if a value has been accepted by all nodes, but the commit message is still in flight, does the client read the previous value, or does it retry? If the read quorum has some nodes that have received the commit messages, and some that do not, is it safe to read a new value, or a retry is needed? We do not think it is safe to read, and the paper seems to mention that in passing as well. There are quite a few more questions around the reads and their safety here, so we wished the paper addressed the reads in greater detail. 

3) Rare conflicts. One of the motifs in the paper is that conflicts are rare in scenarios Pando is designed for. This influences a lot of design, as most of the systems are written to work well in the conflict-free case, and somehow address conflict case. For example, when two clients try to write on the same key/document, that will cause a conflict, and Paxos is not very good at handling it due to the dueling-leader problem. Same for the read problem above in (2), as we feel like conflicts will cause the retries, significantly impacting the performance. What is interesting is that despite largely conflict-free evaluation, Pando did better than Fast Paxos in latency, a protocol designed for 1 RTT consensus in conflict-free cases. We feel like the reason is that Pando can get its writes close to 1 RTT with the clever delegates trick, while it can keep the read quorums small to optimize for latency, while Fast Paxos approach is stuck with larger quorums for reads. 

4) Quorum Size. We have spent quite a bit of time discussing quorum sizes. We were confused a bit about the quorum descriptions from the paper, but in the end, we were able to figure out our misunderstanding.

5) Comparison with Atlas. Pando extensively compares with EPaxos. Atlas is a newer, EPaxos-influenced protocol, so we were curious about how the two would compare. 

6) Scalability.  Reducing latency is very important for WAN setups, however, we felt that the paper is not addressing the throughput question. Having too many nodes may have an impact on scalability/throughput. Since Pando uses the clients as proposers, the grunt of dealing with too many storage replicas goes to the clients, and scalability should not be impacted that much. However, the delegates may become the bottlenecks for some workloads.

7) Delegates for throughput. Pando uses delegates to improve latency. We noticed that a somewhat similar idea is used in Compartmentalized Paxos and PigPaxos to improve the throughput instead.

Next Meeting

As always, next week we will continue looking at more great distributed systems papers. We are starting with the OSDI 2020 bunch of papers, and in our next meeting, we will discuss “Fault-tolerant and transactional stateful serverless workflows.” Please join our slack to participate in the discussions and Zoom meetings.

Reading Group. Autoscaling Tiered Cloud Storage in Anna.

This week we looked at “Autoscaling Tiered Cloud Storage in Anna.” This is the second Anna paper. The first one introduces Anna Key-Value store, and the second paper talks about various “cloud-native” improvements. The presentation by Michael Whittaker is available here:

Short Summary

The Anna Architecture

Anna is an eventual-consistent key-value data store, where each value is a lattice. Anna can tweak some consistency guarantees by changing the type of lattice stored in the value. For example, it can be a Last Write Wins (LWW) system that merges operations based on the definition of the last writer (i.e. a timestamp), or it can use some CRDT lattice to preserve all updates/operations. The weak consistency of Anna means that it is relatively easy to add and remove nodes to scale the systems. The Autoscaling part takes advantage of that and dynamically adjusts the number of replicas responsible for each key to match the workload. Moreover, Autoscaled Anna has multiple usage tiers for keys based on the usage frequency. The paper discusses “hot” and “cold” keys, but more tiers are possible. Such tiering allows using heterogeneous hardware to better balance the cost and performance of Anna, as “cold” keys may run on cheaper hardware backed by SSDs, while “hot” keys may be store in RAM on more expensive VMs. Anna has rules for data transitioning between tiers and for controlling the degree of replication. For example, there is a minimum replication factor for durability, and there are usage thresholds for promoting and demoting keys to/from hot/cold tiers.


We have spent quite some time talking about this paper, as there is a lot of information to digest. Below are a few of the bigger discussion points

1) Apache Cassandra comparison. This came up a few times as lacking, as the system is somewhat similar to Cassandra, at least when you consider the LWW lattice. There are many differences too, especially in the context of autoscaling. Cassandra uses a statically configured replication factor and lacks any autoscaling. Cassandra’s DHT model is pretty rigid and resists scaling – adding nodes require considerable ring rebalancing. Also, only one node can be added/removed from the ring at once to avoid membership and ring partitioning issues. Anna has no such problem and can adjust quicker and more dynamic. Additionally, for all the fairness, the original Anna paper compares against Cassandra.

2) Breaking Anna. Part of the discussion was around breaking the autoscaling and putting Anna in a bad/less-performant state. One example would be using a workload that changes at about the same rate as Anna can adjust. For example, if we hit some keys above the threshold to promote from the “cold” tier to “hot”, we can cause a lot of data movement as these keys potentially migrate or added to the memory-tier hardware. At the same time, we can reduce the usage, causing the systems to demote these keys, and experience associated overheads. This is like putting a workload in resonance with Anna’s elastic abilities, and it reminds me of a famous Tacoma Narrows bridge collapse

3) Possible consistency guarantees. One of the benefits of Anna is that some guarantees can be tweaked based on the lattice used for the value. Part of the discussion focused on how much consistency we can get purely out of these lattices, and how much may necessitate more drastic architectural changes. It is pretty straightforward to understand that Anna won’t ever provide strong consistency, but what about some other guarantees? For example, the original Anna paper claims the ability to have causal consistency, although it uses more complicated lattices, and keeps vector clocks for maintaining causality, not only making the system more complicated but also potentially harming the performance (something noted by the authors themselves).

4) Comparison fairness. Some points were mentioned about the evaluation fairness. It appears that some systems compared against are significantly more capable and/or may provide better guarantees(DynamoDB, Redis). It is worth noting, however, that there are not that many, if any, direct competitors, so these comparisons are still very valuable.

5) Cloud-Native. This appears as one the “most” cloud-native database papers that talks about some practical aspects of designing and running a  cloud-native database. One aspect of this “cloud-nativeness” is the focus on elasticity and scalability happening automatically. Another one is the cost-performance consideration in how the system scales with regard to workload changes. We noted some other much older papers that fit our definition of “cloud-native” papers. One example was Yahoo’s PNUTS paper that describes a global system with some capability to adjust to workloads and selective replication.

6) Scheduling/resource allocation. One of the challenging aspects of  Anna with autoscaling is figuring out the best key “packing” or binning” given multiple tiers and pricing constraints. This is similar to a Knapsack problem and bin packing problem, and both are NP-complete. So this resource management/scheduling will become a big problem at the production scale when we may have thousands of machines. At the same time, task packing problems at the datacenter level are well studied, for example in Google’s Borg cluster management system or its derivative Kubernetes


Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!