New Reading List: Papers #101-110

Summer term papers are here! The list is bellow. Also, here is a Google Calendar.

Reading Group. ByShard: Sharding in a Byzantine Environment

Our 93rd paper in the reading group was “ByShard: Sharding in a Byzantine Environment” by Jelle Hellings, Mohammad Sadoghi. This VLDB’21 paper talks about sharded byzantine systems and proposes an approach that can implement 18 different multi-shard transaction algorithms. More specifically, the paper discusses two-phase commit (2PC) and two-phase locking (2PL) in a byzantine environment.

As usual, we had a presentation of the paper. Karolis Petrauskas did an excellent job explaining this work: 

The paper states that modern blockchain technology relies on full replication, making the systems slower and harder to scale. 

Sharding is a natural way to solve the problem and has been done countless times in the crash fault tolerance setting. Of course, a sharded system often needs to perform transactions that touch data in more than one shard. The usual way to solve this in CFT is to use some version of 2PC coupled with some concurrency control mechanism, like 2PL. ByShard follows this general recipe, but in the BFT setting, which complicates things a bit. The problem is making 2PC and 2PL work in a byzantine, adversarial environment without tightly coupling all shards back together into one “megashard.” So, we need a way to communicate between shards in a reliable way. 

Let’s look at a transaction lifecycle. When we have a transaction that spans multiple shards, the first step is to deliver this transaction to each shard and check whether a shard can run it. Then, if everything is ok, we need to run the transaction and provide some isolation from other ongoing transactions. ByShard implements all these actions with shard-steps. Each shard-step is a building block of all ByShard protocols and allows the shard to inspect a transaction, make changes to the local state, and send the message to start another shard-step on another shard. Overall, ByShard uses three distinct types of shard-steps: vote-step, commit-step, and abort-step. 

The message sending part is kind of important, as we need this communication to be reliable in the BFT setting. The paper gracefully ignores this problem and points to a few solutions in the literature. In short, ByShard requires a cluster-sending protocol that ensures reliable communication between shards, such that, all correct nodes of the receiver shard get the message, all the correct nodes of the sender shard get an acknowledgment, and that sending requires the sender shard to reach an agreement on what to send. The last point ensures that bad actors do not send malicious stuff, and I suspect on the receiver there needs to be a way to check that the received messages were indeed certified by the sender’s consensus. 

Vote-step is used to replicate the transaction between shards. When a shard receives the transaction, it starts the vote-step and checks whether it can proceed with the transaction. The shard may also perform local state changes if needed. At the end of the vote-step, a shard forwards some information to another shard to start a new shard-step. Since we only have three building blocks, the stuff vote-step sends can start another vote-step, commit-step, or abort-step at the receiving end. The purpose of commit-step and abort-step is self-evident from their name. One important thing to note on abort-step is that it needs to undo any local changes that a prior vote-step might have done to ensure that the aborted transaction leaves no side effects. 

Now we can look at how ByShard composes these three basics steps. The figure above provides a visual illustration of three different ways ByShard runs 2PC. One aspect of the figure that I fail to understand is why not all shards run vote-step and commit-step, and the text does not really provide an explanation.

In the linear orchestration, the transaction progresses from the coordinator one shard at a time. If any shard decides to abort, it needs to start the abort-step and notify all other shards involved in the transaction (or at least all other shards that voted earlier). If a shard decides to commit, it actually starts a vote-step in the next shard. If the vote-step successfully reaches and passes the last shard involved in the transaction, then that last shard can broadcast the commit-step to everyone. Centralized orchestration looks more like the traditional 2PC, and distributed orchestration cuts down on the number of sequential steps even further. The three strategies represent tradeoffs between the latency and number of communication exchanges and shard-steps. 

So with 2PC taken care of, we can briefly discuss the concurrency control. ByShard proposes a few different ways to implement it, starting with no concurrency control, thus allowing observation of partial results. Because of the side effect cleaning ability of abort-step, if some transaction partly executes and then reaches the abort-step, then its execution will be undone or rolled back. This reminds me of the sagas pattern. The other solution is to use locks to control isolation. The paper (or the presentation above) has more details on the nuances of locking and requiring different locks with a different type of orchestration. By combining different ways to orchestrate the transactions with different ways to execute them, ByShard presents 18 BFT transactional protocols with different isolation and consistency properties. 


1) Comparison with Basil. An obvious discussion is a comparison with Basil, another transactional sharded BFT system. Basil is a lot closer to the Meerkat solution from the CFT world, while ByShard is a more classical 2PC+2PL approach. In Basil, the degree of fault of tolerance is smaller (i.e, it needs 5f+1 clusters). At the same time, ByShard is a lot underspecified compared to Basil. ByShard relies on existing BFT consensus and BFT cluster-broadcast mechanisms to work, while Basil provides a complete and contained solution. On the performance side of things, ByShard requires a lot of steps and a lot of consensus operations across all the involved shards to do all of the shard-steps. This must have a significant performance toll. While it is not correct to compare numbers straight between papers, Basil can run thousands of transactions per second, while ByShard’s throughput is in single digits. However, it is worth mentioning that ByShard’s experiments included more shards; ByShard’s transactions also involved large number of shards. 

2) (Distributed) Sagas Pattern. As I briefly mentioned in the summary above, ByShard, especially with linear orchestration and no isolation reminds me of Sagas patterns. Distributed sagas are used to orchestrate long-running requests in microservice-type applications. If we squint our eyes, we can see each shard as a separate microservice. As vote-steps propagate across shards, they perform local changes. And if an abort is needed, the abort causes these changes to be rolled back. However, when we add isolation to ByShard, the similarities with sagas start to disappear. 

3) Performance. An important motivation for sharding is performance, however, it does not seem like ByShard achieves stellar performance here. Of course, sharding is still useful for systems that operate with large amounts of data that otherwise would not fit into a single machine. Nevertheless, without strong performance, a solution like this has very few advantages over not using sharded/partitioned systems at all. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. CompuCache: Remote Computable Caching using Spot VMs

In the 92nd reading group meeting, we have covered “CompuCache: Remote Computable Caching using Spot VMs” CIDR’22 paper by Qizhen Zhang, Philip A. Bernstein, Daniel S. Berger, Badrish Chandramouli, Vincent Liu, and Boon Thau Loo. 

Cloud efficiency seems to be a popular topic recently. A handful of solutions try to improve the efficiency of the cloud by ratcheting up the utilization of already provisioned hardware and infrastructure. These approaches tend to provide some service using otherwise wasted resources. For instance, we have covered a paper that suggests running serverless computing on harvested VMs that use resources normally wasted/idle in the data center. CompuCache provides a similar utility, but instead of running a pure serverless compute infrastructure, it suggests using Spot VMs to run a compute-oriented caching service. See, running a purely caching service is more memory intensive, leaving CPUs still relatively underutilized, defeating our goal of using otherwise wasted resources well. So CompuCache is a caching service that can execute stored procedures against the cached data. This is the “Compu” part, and I suppose it enables a better CPU utilization of the spot VMs the service is running. 

So, now let’s talk about using spot VMs for storage systems. Spot instances have unpredictable lifespans controlled by the cloud vendor and not the users/engineers. When the resources used by a spot instance are needed to support other services, the cloud vendor evicts the VM with little notice. This fickle nature of spot VMs precludes them from reliably supporting a storage architecture. However, caching may be a somewhat viable option since the master copy of the data is stored on some durable service. Of course, we still want to avoid cache misses as much as possible for performance reasons, so CompuCache needs to have mechanisms to deal with spot VM evictions. 

CompuCache architecture relies on the Cloud VM Allocator component to get the resources to run multiple caches for different applications. The allocator allows client applications to specify the size of cache they need and allocate the resources. The system assigns memory in fixed chunks (1 GB), and these chunks can go to different spot VMs that have the resources. Naturally, if no existing VMs have the resources, the allocator will try to get more VMs running. The clients “see” the allocated memory as an address space, and can write and/or read from memory knowing some address and length of data. 

The sharded nature of the cache means that the clients somehow need to know where their data may be in the system. CompuCache uses a scheme that maps virtual global cache address space to the specific nodes and their local address spaces. The client application constructs and maintains this map based on the results of cache allocation through the Cloud VM Allocator. Moreover, the client application is responsible for distributing this map to the CompuCache instances. When the node composition of the cache changes due to VM evictions or new VM additions, so is the mapping of virtual memory space to instances. As the system evolves, each client application must migrate parts of its cache between VMs. The paper has more details on the procedures involved in data migration due to spot VM deallocation/destruction.

An interesting part of CompuCache is its compute-oriented API. The clients can read and write data directly from the cache, but they can also run stored procedures (sprocs). I suspect the stored procedures allow CompuCache to better utilize unused cores that go into the spot instances. I was under the impression that stored procedures are not a very popular tool, but I may be mistaken here. Anyway, of course, the authors provide another explanation for using stored procedures to interact with ComputCache — latency. See, if we need to read something and then update something else based on the retrieved value using just puts and gets, we will pay the price of 2 RTTs (not to mention screwed-up consistency). Using stored procedures, we can achieve this read-then-update pattern in just a single sproc call and 1 RTT between the client and cache. This is not a new idea, and some systems have been suggesting sprocs for atomic access to multiple keys and latency benefits. But here is a caveat, CompuCache is sharded and shards are relatively fine-grained, so the data you access in stored procedures may be on multiple VMs. This not only negates the latency benefit but also complicates CompuCache. To deal with data on multiple VMs, the system can either retrieve the data to a machine running the sproc or move the computation itself to the next VM. 

For the evaluation, the authors compared CompuCache with Redis, and it seems to outperform Redis by a large margin. This is interesting and unexpected, as there is nothing about using spot instances that should be beneficial for performance. Quite the opposite, the VM churn and reclamation should make CompuCache more likely to experience cache misses and spend more time migrating data and rebuilding caches. One explanation here is the poor performance of Redis sprocs implementation used in the evaluation, which seems to be at least two orders of magnitude slower than CompuCache. There is no word on what CompuCache uses for its sprocs to make it so much faster.


1) Application-driven design. CompuCache places a lot of burden on the application that needs the cache. It seems like the “memory management” done by the application can be rather heavy. Any data migration requires remapping of the virtual address space to nodes. Consequently, clients then need to redistribute this map so that nodes themselves can find the data they may need to run sprocs. This can make the system brittle due to the application’s mismanagement of the cache. 

2) API. An interesting thing to note is the API. ComputCache allocates memory to applications as some chunk of bytes. This chunk of bytes represents the virtual memory space that is mapped to multiple nodes/VMs. The API allows to read and write at some byte offset and length. So, there is no KV abstraction common in other caches, and in fact, the application needs to implement KV API if needed.

3) Sprocs vs. Serverless functions? One major point of discussion was comparison with prior work on serverless functions using harvested resources. The compute part of CompuCache looks a lot like a serverless function meant to operate on state directly from the cache. I’d suspect the sproc should also have the ability to find data in storage in case of a cache miss, but paper leaves any details on cache misses. Anyway, it seems that a sproc is a bit more restrictive than a general serverless function. So the question is whether you can run harvest VM serverless infrastructure capable of full-fledged function runtime and use it instead of CompuCache sprocs? Of course, you lose that nice benefit of collocating data and compute, but this benefit is elusive at best when a cache is sharded into many VMs anyway. The problem here is the map of virtual space to nodes — application knows it and maintains it, but a separate serverless compute does not know it, so it won’t have a clue what nodes to contact for data. So we are back to the point above — relying on the client application to manage cache can be brittle and restrictive. 

4) Sproc performance. The performance evaluation is on the weaker side of the paper, mainly because it leaves many things underspecified. Why does Redis sproc is so much slower? How was the comparison done given the difference in cache APIs? What was the implementation of CompuCache? 

5) Failures. Finally, it is hard to discuss any distributed systems without talking about failures. CompuCache leaves a lot of details on handling failures out of the paper. It is not clear what sprocs will do in case of a cache miss. In fact, it is even unclear what a cache miss is in CompuCache, as it does not store objects and provides some memory space for the application instead. It seems like this is all up to the application — if some memory is unreachable, the application should know how to find that data in a reliable store. This, of course, leaves sprocs vulnerable to failures. A big part of the paper’s fault tolerance discussion is data migration due to Spot VM eviction. In the real world, however, servers may fail without notice. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Using Lightweight Formal Methods to Validate a Key-Value Storage Node in Amazon S3

For the 90th reading group paper, we did “Using Lightweight Formal Methods to Validate a Key-Value Storage Node in Amazon S3” by James Bornholt, Rajeev Joshi, Vytautas Astrauskas, Brendan Cully, Bernhard Kragl, Seth Markle, Kyle Sauri, Drew Schleit, Grant Slatton, Serdar Tasiran, Jacob Van Geffen, Andrew Warfield. As usual, we have a video:

Andrey Satarin did an excellent write-up on this paper, so I will be short here. This paper discusses the use of formal methods in designing, developing, and maintaining a new key-value storage subsystem in Amazon S3. In my opinion, the core idea and contribution of the paper is establishing engineering processes that ensure that implementation adheres to the specification. 

See, using traditional formal methods is hard in practice. For example, using TLA+ to model the system creates a model that is detached from implementation. If something in the system changes, both need to be updated. Moreover, updating TLA+ would require experts in TLA+, creating a possibility of model and code diverging over time. Another problem is using the model to check the implementation. Since the two are detached from each other, this becomes rather difficult. 

The paper proposes lightweight formal methods that can be integrated directly into the testing/build framework. The approach requires implementing a reference model for each component that captures the semantics/behavior of the component without any real-world concerns. For example, the model for LSM-tree is a hashmap — both have the same behavior and allow reads and writes of some key-value pairs. 

Engineers use these references models n a couple of ways. First of all, the models act as mocks for unit tests. This increases the utility of the models and forces the engineers to keep them up to date. But of course, the main purpose of the models is to confirm the behavior of the real implementations. The gist of the conformance checking is verifying that the implementation’s behavior is allowed by the model. To do so, the same test, consisting of some sequence of operations, runs against the implementation and the model. The expectation is that both go through the same states and deliver the same outputs. Below I have an image borrowed from the author’s slides that illustrate the process. 

Conformance checking. Same operations run both in model and implementation, expecting the same result in both. This can include running some background operations as well, such as garbage collection (GC) that have no impact on the model but may affect the implementation.

Of course, there are some challenges with running these tests. For instance, it is important to generate interesting scenarios/sequences of operations to test various behaviors. Another challenge is introducing failures into testing. And finally, sequential execution of some operations does not provide comprehensive coverage in modern, multi-threaded, or distributed code, requiring some concurrent testing as well. The paper talks more about these different scenarios in greater detail. 


1) Reliance on experts in formal methods. A big point made by the paper is about not having to rely on formal experts to maintain the models and verification. The paper says that while initially all models were written by experts, at the time of writing the paper, about 18% of the models were written by non-experts. To us, this sounded both as a big and small number. It is important to allow engineers to maintain the models and conformance checking framework, and the number clearly shows that the core engineers are getting onboard with the processes involved. At the same time, it is not clear whether a team can completely get rid of expert support. 

2) Importance of processes. As we discussed the reliance on experts and reducing this reliance, it became clear that a big contribution of this paper is about the importance of engineering processes. And it is not just about having some processes/workflows to facilitate formal methods adoption. What is crucial is making these processes scale and thus not require significant additional effort from the engineers. For example, developing models to support formal methods is an extra effort. Using these models as mock components for unit tests amortizes such extra effort into almost no additional work. After all, we need to do unit testing anyway, and using mocks is a common practice.

3) TLA+? It is hard to discuss any formal methods paper in our reading group without having some discussion on TLA+. We have talked about the difficulty of keeping the models up to do date with the implementation. Using TLA+ does not seem to allow for a low-effort mechanism — there is a big overhead to having engineering processes/practices that keep the TLA model and implementation coherent.

4) Testing Reading List. Our presenter, Andrey, has compiled an extensive reading list on testing distributed systems. It is most definitely worth checking out. 

5) Codebase size. The authors talk about using these lightweight formal methods on the codebase of about 40,000 lines of code. This is not that much code, to be honest. In fact, this is less code than a handful of my academic projects, not to mention the real software that I have worked on. So it would be interesting to see how these approaches can scale to bigger codebases and bigger teams with more people.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Basil: Breaking up BFT with ACID (transactions)

Our 89th paper in the reading group was “Basil: Breaking up BFT with ACID (transactions)” from SOSP’21 by Florian Suri-Payer, Matthew Burke, Zheng Wang, Yunhao Zhang, Lorenzo Alvisi, and Natacha Crooks. I will make this summary short. We had a quick and improvised presentation as well. Unfortunately, this time around, it was not recorded. 

The system presented in the paper, called Basil, proposes a sharded BFT transactional system. Basil is leaderless too, and overall reminds me of the Tapir/Meerkat line of work turned into the BFT setting. In these systems, clients do a bulk of the work, which definitely complicates things for BFT protocols — now we no longer have “dumb” clients who can be byzantine and instead rely on smart clients who can screw stuff up quite substantially. 

The paper lists a few common assumptions for their system, such as the inability of Byzantine clients to cause a denial of service attack and having some degree of fault tolerance f in each shard with a shard of 5f+1 nodes. Also important are the definitions of byzantine isolation levels. In short, the paper aims to provide serializability, but only for correct clients. So byzantine participants cannot cause the reorder of operations observed by the well-behaved clients. 

As I mentioned above, the actual protocol is very similar to Tapir/Meerkat. The clients run interactive transactions by reading from the involved partitions and buffering the writes locally. Once ready to commit, the clients run a two-phase commit protocol. Just like the CFT counterparts, the transactions are timestamp-ordered by clients, so clients must have synchronized time. Since the clients pick the timestamps, a faulty client can choose one far in the future, causing other dependent transactions to abort/hang. To avoid this, Basil replicas have a window of time in which they accept transactions. Replicas reject any transaction started by the client with a timestamp that is too much off from the replica’s time. 

Another BFT issue that arises with running a two-phase commit is preparing a transaction with too many keys and not committing it. Similar to the timestamp problem, this can block any other dependent transactions. To mitigate the issue, the Basil protocol allows other clients to take over the stalled transactions. Such a takeover also solves client crash failure.

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit. 

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT. 

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine. 

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit. 

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT. 

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine. 

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit. 

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT. 

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine. 


1) Low fault-tolerance. Requiring six nodes to tolerate one failure (5f+1 configuration) is a rather fault-tolerance threshold. To double the fault tolerance, we need 11 replicas! For comparison, the HotStuff protocol, which authors use as a baseline, needs a cluster of size 3f+1. Requiring more nodes also raise some questions about efficiency — while the performance is good, the protocol also needs more resource to achieve it. 

2) More fault-tolerance? In a few places in the paper, it is mentioned that up to votes can be missed due to asynchrony, and another f due to byzantine behavior: “A unanimous vote ensures that, since correct replicas never change their vote, any client C′ that were to step in for C would be guaranteed to observe at least a Commit Quorum of 3f + 1 Commit votes; C′ may miss at most f votes because of asynchrony, and at most f more may come from equivocating Byzantine replicas.” This suggests that the practical fault tolerance may be better than just f. 

3) Unanimous fast quorum. The unanimous fast quorum is another potential problem for performance when things are not going well. The sole faulty client will throw the protocol off the fast-path prepares, requiring more resources to prepare each transaction. Not to mention, waiting for a timeout on a faulty replica does not improve the latency. 

4) Questions about recovery. We had some questions about the recovery procedure. It seems like the first step is to try to recover by reading the recorded prepare vote, and if everything is good, simply finish the commit for the transaction. However, it appears that durably recording votes in one place is an optional stage: “If some shards are in the slow set, however, C needs to take an additional step to make its tentative 2PC decision durable in a second phase (ST2).” As a result, under normal conditions, there may not be votes from ST2 to recover from one shard/partition. Does the recovering client then need to contact all partitions of a transaction? 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. EPaxos Revisited.

In the 88th DistSys meeting, we discussed the “EPaxos Revisited” NSDI’21 paper by Sarah Tollman, Seo Jin Park, and John Ousterhout. We had an improvised presentation, as I had to step in and talk about the paper on short notice without much preparation. 

EPaxos Background

Today’s paper re-evaluates one very popular in academia state machine replication (SMR) protocol — EPaxos. EPaxos solves a generalized consensus problem and avoids ordering all state machine operations. Instead of a single total order of everything, EPaxos relies on a partial order of dependent operations. For instance, if two commands are updating the same object, then these commands may depend on each other and must be ordered with respect to each other. These dependant operations can happen concurrently, in which case they are conflicting. Not all operations depend on each other or have conflicts. For instance, if operations are commutative, and let’s say, work on different objects, requiring an order between them is overkill. Avoiding ordering operations that do not care about the order can enable some parallelism. 

In practice, things are a bit more complicated. The first problem is figuring out if operations depend on each other and conflict. Of course, one can funnel all commands/operations through a single node, like Raft or Multi-Paxos, and have that node figure out which operations conflict and which do not. However, our goal was to improve parallelism, and with a single node to handle all commands, we are not achieving it, at least not on the scale possible in distributed systems. So, we need different nodes to start operations independently and learn if these operations may conflict with each other. EPaxos does this with its leaderless approach.

Every node in EPaxos can receive a request from a client and replicate it in the cluster, learning of dependencies and possible conflicts in the process. We can call the node driving replication of a command a coordinator. If a quorum of nodes agrees on dependencies for the command, the protocol commits in one round trip. Such single RTT replication is called a fast path, and in EPaxos it requires a fast quorum that can be bigger than the traditional majority quorum in some deployments. However, if nodes disagree about dependencies, there is a conflict, and the protocol may take a slow path. Slow path essentially uses Paxos to replicate and commit the operation with a “merged” list of conflicting dependencies to other nodes. It ensures that conflicts that may have been observed by only a minority of nodes are recorded by a quorum for fault tolerance. 

Let’s look at this a bit deeper. The two commands ‘C1’ and ‘C2’ conflict when some nodes have received them in a different order. As such, one node may say ‘C1’ comes before ‘C2,’ and the other node orders ‘C2’ before ‘C1.’ As such, we express conflicts by a mismatch of dependencies at different nodes. In this example, the mismatch is ‘C1’ depends on ‘C2’ vs. ‘C2’ depends on ‘C1.’ EPaxos needs to ensure that enough nodes know about all the dependencies happening, so a slow path is taken to commit the command with all the learned dependencies. It is important to understand that not all dependencies are conflicts. For example, if operation ‘C4’ happened so much after operation ‘C3,’ that all (or really a quorum of nodes) nodes see that ‘C3’ happened before ‘C4’ in their dependencies, then this is not a conflict, and we can proceed with the fast path. 

Now let’s take a look at a few examples. The figure above, which I borrowed from the revised paper, has three distinct operation examples. In example-1, node ‘A’ is the coordinator for ‘x=5’ operation, and it knows of no dependencies for the object. It reaches out to a quorum, and the followers also know of no dependencies. Coordinator ‘A’ concludes the operation in the fast path. The same would have happened if the dependencies known to ‘A’ matched the followers — since this is a no-conflict situation, ‘A’ can commit in the fast path. In example-2, node ‘E’ knows of no dependencies for ‘x=9’ command, but one node in the quorum has recorded a dependency in the form of other operation on object ‘x’. This dependency mismatch between replicas is a conflict, and ‘E’ takes the slow path to record the combined observation. Note that initially, only replicas ‘A-C’ had the dependency, but after the Accept message by ‘E’, both ‘D’ and ‘E’ also have it. Performing the slow path ensures that at least a quorum of nodes knows the dependency. Finally, example-3 has node ‘A’ as the coordinator again. It knows a previous command as a dependency, but the followers know more additional dependencies. Even though there seems to be a conflict, all the followers agree on dependencies. In this case, the protocol can proceed in the fast path in some cases. The exact conditions that determine whether node ‘A’ can take that fast path depend on how ‘A’ communicates with the rest of the replicas. I will discuss this later. 

There are a few caveats to the process. First of all, when the coordinator learns of dependencies, it must learn the whole chain of dependencies to make sure the new command is properly ordered and executed at the proper time. So, over time the dependency lists get larger and larger. At some point, it will make no sense to remember old commands that were executed a long time ago on all nodes as conflicts, so the conflict and dependency tracking system must have a way to prune the no longer needed stuff. Another issue is the need to wait for all dependencies to execute before executing the new operation. As a result, the acts of committing an operation and applying it to the state machine can be quite far apart in time. 

One detail missing in my simplified explanation so far is the need to construct dependency graphs for the execution of operations. See, the fast path and slow path modes are all about committing the operations and their dependencies. We have not really ordered the operations yet, and the dependency graph will help us establish the actual execution order. Each operation is a vertex in a graph, and dependencies between operations are the edges. So to execute an operation, we first add it to the graph. Then we add the dependent operations and connect the operations and their dependents. Then for each new vertex, we also look at its dependents and add them. The process continues until there is nothing more to add to the graph. In the resultant dependency graph, concurrent operations may appear to depend on each other, creating loops or cycles. The execution order is then determined by finding all strongly connected components (SCCs), which represent individual operations, conflict loops, or even groups of loops, sorting these SCCs in inverse topological order, and executing SCCs in that order. When executing each strongly connected component, we deterministically establish an operation order for commands in the SCC based on a sequence number and execute all non-executed operations in the increasing sequence number. A sequence number is maintained to break ties and order operations in the same SCC. The paper has more details about the sequence number. As you can imagine, dealing with the graph is quite expensive, so it is even more important to prune all old dependencies out. 

Finally, one of the biggest caveats to this fast path leaderless process is typical in distributed systems. Of course, I am talking about handling failures. In particular, the protocol needs to ensure that failures do not erase the fast-path decision. It is tricky due to concurrency, as two or more coordinators may run conflicting operations. Having such concurrency is like attempting to place two different commands in the same execution slot or position. If we use a majority decision to see which command was successful in that particular position, then we risk creating an ambiguous situation even when just one node fails. For instance, if a command ‘C1’ has three votes, and ‘C2’ has two in a cluster of five nodes, then a failure of one ‘C1’ vote creates ambiguity, as both ‘C1’ and ‘C2’ have the same number of remaining nodes. Fast Paxos paper describes the problem well, and the solution is to use a bigger fast quorum to avoid such ambiguity. 

A larger fast path quorum is the case in EPaxos protocol. However, the original paper proposes some optimizations to allow a majority fast quorum in configurations of three and five nodes (technically, it is an \(f+\lfloor\frac{f+1}{2}\rfloor\) fast quorum). Of course, these optimizations do not come entirely for free, as the need to solve the ambiguity problem remains. The optimized recovery needs only \(\lfloor\frac{f+1}{2}\rfloor\) responses and some additional information about the dependencies available at the nodes. The whole set of optimizations is hard to summarize, so I refer interested readers to the paper, Section 4.4

Now let me return to the example-3 fast path again. Previously I said that the quorum needs to agree on dependencies, but also showed example-3. In that example, the coordinator disagrees with followers. However, the coordinator still takes a fast path as long as all the followers in the quorum agree. Unfortunately, this is not the case in optimized EPaxos, at least not always. In fact, the paper says that the followers must have the same dependencies as the coordinator for the fast path if the coordinator communicates with all nodes and not just an exact quorum of nodes. Moreover, the followers must record having matching dependencies with the coordinator. So, example-3 works when the coordinator only contacts enough nodes to form a quorum. Intuitively, succeeding in this configuration means that not only enough nodes matched the dependencies, but all other nodes do not even learn about the operation from the coordinator. Upon recovery, this restricted communication gives additional information, as the recovery protocol can know what nodes were part of the initial fast quorum. In practice, however, this creates problems, as it is harder to get a fast quorum to agree, as the coordinator needs to be lucky to pick exact nodes that agree on the dependencies. Moreover, just one node failure results in failed fast quorum and forces retry, further contributing to tail latency. 

As such, practical implementations broadcast messages to all nodes and expect the fastest to reply to form the quorum. In the broadcast-to-all communication, example-3 in the figure will not work. 

EPaxos Revisited

The revisited paper brings a few important contributions to EPaxos. First, it conducts a re-evaluation of the protocol and makes an excellent comparison to the Multi-Paxos baseline. I really like the re-evaluation part of this paper! 

Unlike the original EPaxos paper, the revisited paper reports performance for a much bigger range of possible conflicts. The paper uses a more realistic read/write workload with different read-to-write ratios. The authors also change the conflict workload to a more realistic scenario. In this original paper, conflicts were simulated by having concurrent commands access the same predefined key — a “hot spot” method. The revisited paper no longer focuses on just one key and, instead, simulates conflicts by using Zipfian distribution to select keys. Finally, the reporting of results is way more visual. The figures capture the performance differences between EPaxos and MultiPaxos in every region and tested scenario. I was surprised to see very bad EPaxos tail-latency, even given the expectations of it having more performance variability. Below is the evaluation figure for latency in a five regions deployment. It may appear confusing at first, but just reading the caption and staring at it for a minute was all I needed to understand it.

Another important part of re-evaluation is addressing the commit vs. execute latency problem. As mentioned in the summary above, execution can delay significantly after the commitment. Yet, the original EPaxos measured only commit latency. The figure below shows the difference.

The second contribution is a bit shaky. EPaxos performance degrades as the conflict rate rises. Naturally, finding a way to reduce conflicts can help avoid the expensive slow-path, which, in turn, will improve both latency and throughput. The paper proposes a technique they call Timestamped-Ordered Queuing (TOQ) to reduce conflicts. Recall that the EPaxos coordinator sends its dependencies to the followers, and the followers reply with their updated dependencies. Whether the algorithm takes a fast path depends on a few conditions. The basic rule is that all followers in the quorum must have the same dependencies. TOQ exploits this and adds some delays at each follower to increase the chance of followers having the same conflicts/dependencies. For concurrent conflicting operations, quorum replicas may receive these operations in a different order. These out-of-order messages require a slow path to resolve. But if followers wait a bit to receive these concurrent messages and order them deterministically, then we can have the same dependencies/conflicts at follower replicas in the quorum. 

This waiting does not fix the conflicts coordinator communicated with the followers. And this is where my problem with the revisited paper lies. The TOQ optimization relies on the coordinator’s ability to communicate a different list of dependencies than the followers and proceed with the fast path, as in example-3. However, this only happens when we use unoptimized EPaxos with a larger fast quorum or make the coordinator use “thrifty” mode by contacting only a quorum of nodes instead of all nodes. In both cases, we lose something — larger quorums increase latency and decrease the number of failures a system can sustain before it is required to take a slow path all the time. And thrifty optimization can increase tail latency substantially, as a single failed node may prevent the coordinator from having the quorum of votes, requiring a timeout and retry. Considering that delaying the followers’ reply already adds extra latency, the TOQ optimization becomes a bit questionable in practice. 

The paper studies the added effect of delaying the followers on latency and proposes some compromise solutions to reduce the delay at the expense of having some conflicts. On the evaluation side, the paper says it has not enabled thrifty optimization, so for TOQ to be correct, it must have used a larger fast quorum. However, the paper does not mention this, so it is impossible to know whether the authors performed the TOQ evaluation on the correct system configuration.


1) Correctness. In my presentation, I focused a lot on the correctness of example-3, where the coordinator communicates a different dependency list than the followers have in an optimized EPaxos fast quorum. I said that this is incorrect, according to the original paper. The lack of preparation for the meeting showed — I was wrong

The correctness, in this case, is not necessarily violated, but this largely depends on the “version” and configuration of EPaxos. According to the paper, a thrifty mode, where the coordinator only contacts the required number of nodes to reach quorum, will work as described in the figure, so there are no safety issues. However, without the thrifty mode, the safety breaks. It breaks because we are running a smaller fast quorum (optimized EPaxos) but do not record enough information to solve the ambiguity over taken fast-path that may arise with failures. The non-thrifty, optimized EPaxos is the configuration when we need followers to agree with the dependencies of the coordinator and record this. In the discussion, we looked at some examples of safety violations due to this problem.

Interestingly enough, I am not sure that the authors of the revised paper fully understood the implications of thrifty/non-thrifty and other optimizations on safety. In the EPaxos overview figure, the paper explicitly states: “for simplicity, we only show messages to the necessary quorum,” suggesting that the authors do not assume thrifty mode here, making example-3 incorrect. 

So, where did this example come from? The authors borrow a figure from the original EPaxos presentation. In that presentation, Iulian Morau explicitly stated that the coordinator talks to the majority of replicas only (3 out of 5).

2) TOQ Benefits. In the light of the above, TOQ benefits become a bit more questionable because we are not sure whether the experiments ran on the correct and safe configuration of EPaxos. At the same time, it is worth mentioning that the Tempo paper, which we will discuss later this year, uses a similar approach to TOQ. We also discussed the Accord protocol with a similar technique, so the TOQ is not entirely without benefits and seems appreciated in other literature. 

3) Bounding Dependency Chains. This is an important improvement to EPaxos introduced in this paper. It prevents dependency chains from growing indefinitely under certain conditions. Such uncontrolled growth can prevent execution, stalling the protocol. We have spent quite a bit of time trying to figure out exactly how the proposed solution works. We think the paper could have improved here a bit more on the clarity.

4) Appendix. The appendix has lots of additional performance data that is worth checking. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

New Reading List: Papers #91-100

DistSys Reading Group is getting its 100th paper! Here is the list for the Spring semester:

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Log-structured Protocols in Delos

For the 87th DistSys paper, we looked at “Log-structured Protocols in Delos” by Mahesh Balakrishnan, Chen Shen, Ahmed Jafri, Suyog Mapara, David Geraghty, Jason Flinn Vidhya Venkat, Ivailo Nedelchev, Santosh Ghosh, Mihir Dharamshi, Jingming Liu, Filip Gruszczynski, Jun Li Rounak Tibrewal, Ali Zaveri, Rajeev Nagar, Ahmed Yossef, Francois Richard, Yee Jiun Song. The paper appeared at SOSP’21. This is a second Delos paper; the first one was “Virtual Consensus in Delos,” and we covered it in the 40th DistSys meeting.

The first Delos paper looked at building the replication/consensus layer to support strongly consistent use-cases on top. The system created a virtual log made from many log pieces or Loglets. Each Loglet is independent of other Loglets and may have a different configuration or even a different implementation. Needless to say, all these differences between Loglets are transparent to the applications relying on the virtual log. 

With the replication/consensus covered by virtual log and Loglets, this new paper focuses on creating modular architecture on top of the virtual log to support different replicated applications with different user requirements. In my mind, the idea of the Log-structured protocol expressed in the paper is all about using this universal log in the most modular and clean way possible. One can build a large application interacting with the log (reading and writing) without too many thoughts into the design and code reusability. After all, we have been building ad-hoc log-based systems all the time! On the Facebook scale, things are different — it is not ideal for every team/product to reinvent the wheel. Instead, having a smarter reusable architecture can go a long way in saving time and money to build better systems.

Anyway, imagine a system that largely communicates through the shared log. Such a system can create new log items or read the existing ones. In a sense, each log item is like a message delivered from one place to one or several other locations. With message transmission handled by the virtual log, the Delos application simply needs to handle encoding and decoding these “log-item-messages.”

Fortunately, we already have a good idea about encoding and decoding messages while providing services along the encoding/decoding path. Of course, I am thinking of common network stacks, such as TCP, or even more broadly, the OSI model. Delos operates in a very similar way, but also with a great focus on the reusability and composability of layers. When a client needs to write some data to the log, it can form its client-specific message and pass it down the stack. Lower layers can do some additional services, such as ensuring session guarantees or batching the messages. Each layer of the stack wraps the message it received with its own headers and information needed for decoding on the consumer side. Delos calls each layer in the stack an engine. The propagation down through layers continues until the message hits the lowest layer in the stack — the base engine. The job of the base engine is to interact with the log system.   

Similarly, when a system reads the message from the log, the log item travels up the stack through all of the same layers/engines, with each engine decoding it and ensuring the properties specific to that engine before passing it up. An imperfect real-world analogy for this process is sending a paper mail. First, you write the letter; this is the highest layer/engine close to the application. Then you put it in the envelope — now the letter is protected from others seeing it. Then goes the stamp — it is ready to be sent, then goes the mailbox — client batching, then post office — server-side batching, then transmission, and then the process starts to get undone from the bottom up. 

Of course, I oversimplified things a bit here, but such message encapsulation is a pretty straightforward abstraction to use. Delos uses it to implement custom Replicated State Machines (RSMs) with different functionality. Building these RSMs requires a bit more functionality than just pushing messages up and down the engines. Luckily, Delos provides a more extensive API with required features. For example, all engines have access to the shared local storage. Also, moving items up or down the stack is not done in a fire-and-forget manner, as responses can flow between engines to know when the call/request gets completed. Furthermore, it is possible to have more than one engine sitting at the same layer. For instance, one engine can be responsible for serializing the data and pushing it down, and another engine can receive the item from an engine below, deserialize and apply it to RSM. The figure illustrates these capabilities.  

This tiered modular approach allows Delos to reuse layers across applications or even compose some layers in different order. So when one application or use-case needs batching, that engineering team does not need to reinvent the batching from scratch. Instead, they can take an existing batching layer and add it to their stack in the appropriate location. The flexibility and reusability allowed Facebook engineers to implement two different control-plane databases with Delos. One datastore, called DelosTable, uses a table API, while another system, called Zelos, implements a ZooKeeper compatible service. 

I think I will stop my summary here. The paper goes into more detail about the history of the project and the rationale for making certain decisions. It also describes many practical aspects of using Delos. The main lesson I learned from this paper is about the overall modular layered design of large RSM-based systems. I think we all know the benefits but may often step aside from following a clean, modular design as the projects get bigger and pressure builds up to deliver faster. But then, what do I know about production-grade systems in academia? Nevertheless, I’d love to see a follow-up paper when more systems are built using Delos. 

As usual, we had our presentation. This time Micah Lerner delivered a concise but very nice explanation of the paper:


1) Architecture. This paper presents a clean and modular architecture. I do not think there is anything majorly new & novel here, so I view this paper more like an experience report on the benefits of good design at a large company. I think there is quite a bit of educational value in this paper. 

In the group, we also discussed the possibility of applying similar modular approaches to more traditional systems. For instance, we looked at MongoDB Raft in the group before. Nothing should preclude a similar design based on a Raft-provided log in a distributed database. In fact, similar benefits can be achieved — multiple client API, optional and configurable batching functionality, etc. That being said, for a system designed with one purpose, it is easy to start designing layers/modules that are more coupled/interconnected and dependent on each other. 

We had another similar observation in the group that mentioned a somewhat similarly designed internal application a while back, but again with a less clear separation between modules/layers.

2) Performance. A performance impact is a natural question to wonder about in such a layered/modular system. The paper spends a bit of time in the evaluation explaining and showing how layers and propagation between them add very little overheads. What is not clear is whether a less generic, more purpose-built solution could have been faster. This is a tough question to answer, as comparing different architectures is not trivial — sometimes it can be hard to tell whether the difference comes from design choices or implementation differences and nuances.

3) Read cost & Virtual Log. This part of the discussion goes back quite a bit to the first Delos paper. With a native Loglet, Delos assumes a quorum-based operation for Loglets, which may have less than ideal read performance. This is because NativeLoglet uses a sequencer to write, but requires on quorum read and waits for reads with the checkTail operation. So a client will read from the quorum, and assuming the Loglet is not sealed (i.e., closed for writes), the client must wait for its knowledge of the globalTail (i.e., globally committed slot) to catch up with the highest locally committed slot it observed earlier. This process is similar to a PQR read! Naturally, it may have higher read latency, which will largely depend on how fast the client’s knowledge of globally committed slot catches up. In the PQR paper, we also describe a few optimizations to cut down on latency, and I wonder if they can apply here. 

Moreover, a client does not need to perform an expensive operation for all reads — if a client is reading something in the past known to exist, it can use a cheaper readNext API, practically allowing a local read from its collocates LogServer. 

4) Engineering costs. This discussion stemmed from the performance discussion. While large companies care a lot about performance and efficiency (even a fraction of % of CPU usage means a lot of money!), the engineering costs matter a lot too. Not having to redo the same things for different products in different teams can translate into a lot of engineering savings! Not to mention this can allow engineers to focus on new features instead of re-writing the same things all over again. Another point is maintenance — cleaner and better-designed systems will likely be cheaper to maintain as well. 

5) Non-control plane applications? The paper talks about using Delos in two control-plane applications. These often have some more specific and unique requirements, such as zero external dependencies, and stronger consistency. The paper also mentions other possible control plane use cases, so it does not appear like Delos is done here. 

At the same time, we were wondering if/how/when Delos can be used outside of the control plane. For Facebook, there may not be too much need for strongly consistent replication for many of their user-facing apps. In fact, it seems like read-your-write consistency is enough for Facebook, so deploying Delos may not be needed. At the same time, user-facing apps can take on more dependencies and external dependencies, achieving some code reuse this way.

Another point made during the discussion is about making a more general and flexible replication framework that can support strongly-consistent cases and higher-throughput weaker consistency applications. We would not be surprised if Delos or its successors will one day support at least some stronger-consistency user-facing applications. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Rabia: Simplifying State-Machine Replication Through Randomization

We covered yet another state machine replication (SMR) paper in our reading group: “Rabia: Simplifying State-Machine Replication Through Randomization” by Haochen Pan, Jesse Tuglu, Neo Zhou, Tianshu Wang, Yicheng Shen, Xiong Zheng, Joseph Tassarotti, Lewis Tseng, Roberto Palmieri. This paper appeared at SOSP’21.

A traditional SMR approach, based on Raft or Multi-Paxos protocols, involves a stable leader to order operations and drive the replication to the remaining nodes in the cluster. Rabia is very different, as it uses a clever combination of determinism to independently order requests at all nodes and a binary consensus protocol to check whether replicas agree on the next request to commit in the system.

Rabia assumes a standard crash fault tolerance (CFT) model, with up to f node failures in a 2f+1 cluster. Each node maintains a log of requests, and the requests execute in the log’s order. The log may contain a NO-OP instead of a request.

When a client sends a request to some node, the node will first retransmit this request to other nodes in the cluster. Upon receiving the request, a node puts it in a min priority queue of pending requests. Rabia uses this priority queue (PQ) to independently and deterministically order pending requests at each node, such that the oldest request is at the head of the queue. The idea is that if all nodes have received the same set of requests, they will have identical PQs. 

At some later point in time, the second phase of Rabia begins — the authors call it Weak-MVC (Weak Multi-Valued Consensus). Weak-MVC itself is broken down into two stages: Propose Stage and Randomized Consensus Stage. In the propose stage, nodes exchange the request at the head of PQ along with the log’s next sequence number seq. This stage allows the nodes to see the state of the cluster and prep for the binary consensus. If a node sees the majority of a cluster proposing the same request in the same sequence number, then the node sets its state to 1. Otherwise, the node assumes a state of 0. 

At this point, the binary consensus begins to essentially certify one of the two options. The first option is that a majority of nodes want to put the same request in the same sequence number (i.e., the state of 1). The second option is to certify that there is no such common request in the majority(state of 0). For binary consensus, Rabia uses a modified Ben-Or algorithm. Ben-Or consists of two rounds that may repeat multiple times.

In round-1, nodes exchange their state and compute a vote to be either 0 or 1. The vote corresponds to the majority of state values received, so if a node received enough messages to indicate that the majority of nodes are in state 1, then the node will take on the vote value of 1. Similarly, if a majority has state 0, then the node will vote 0. If no majority is reached for any state, the vote is nil. 

Round-2 of Ben-Or exchanges votes between nodes. If the majority of nodes agree on the same non-nil vote, the protocol can terminate. Termination means that system has agreed to certify the request from the proposal if the consensus value is 1 or to create NO-OP if a value is 0. 

In an ideal situation, all participating nodes will have the same request at the head of their PQs when the propose phase starts. This means that nodes will have the same state at the end of the propose phase, allowing the binary consensus to certify the proposed request at its sequence number in just one round trip (Round-1 + Round-2 of Ben-Or). So the request distribution + proposal + Ben-Or consensus under such an ideal case only takes 4 message exchanges or 2 RTTs. It is way longer than Multi-Paxos’ ideal case of 1 RTT between the leader and the followers, but Rabia avoids having a single leader.

A less ideal situation arises when no majority quorum has the same request at the head of PQ when the proposal starts. Such a case, for example, may happen when the proposal starts before the request has had a chance to replicate from the receiving node to the rest of the cluster. In this case, binary consensus may reach the agreement on state 0 to not certify any operation in that particular sequence number, essentially producing a NO-OP. The authors argue that this NO-OP is a good thing, as it gives enough time for the inflight requests to reach all the nodes in the cluster, get ordered in their respected PQs. As a result, the system will propose the same request in the next sequence number after the NO-OP. Both of these situations constitute a fast path for Ben-Or, as it terminates in just one iteration (of course the latter situation does not commit a request, at least not until the retry with higher sequence number). 

Now, it is worth pointing out that the fast path of one RTT for binary consensus is not always possible, especially in the light of failures. If too many nodes have a nil vote, the protocol will not have enough votes agreeing for either state (1 – certify the request, 0 – create a NO-OP), and the Ben-Or process must repeat. In fact, the Ben-Or protocol can repeat voting many times with some random coin flips in between the iterations to “jolt” the cluster into a decision. For more information on Ben-Or, Murat’s blog provides ample details. This jolt is the randomized consensus part. The authors, however, replaced the random coin flip at each node with a random, but deterministic coin flip so that each node has the same coin flip value for each iteration. Moreover, the coin flip is only needed at the node if there is no vote received from other nodes in round-1 of Ben-Or, otherwise, the node can assume the state of the vote it received. The whole process can repeat multiple times, so it may not be very fast to terminate. 

The paper provides more details on the protocol. Additionally, the authors have proved the safety and liveness of the protocol using Coq and Ivy.

The big question is whether we need this more complicated protocol if solutions like Multi-Paxos or Raft work well. The paper argues that Raft and Paxos get more complicated when the leader fails, requiring the leader election, which does not happen in Rabia. Moreover, Paxos/Raft-family solutions also require too many additional things to be bolted on, such as log pruning/compaction, reconfigurations, etc. The argument for Rabia is that all these extra components are easier to implement in Rabia.

Quite frankly, I have issues with these claims, and the paper does not really go deep enough into these topics to convince. For example, one argument is that leader election is complicated and takes time. Surely, not having a leader is good to avoid the leader election and performance penalty associated with it. But this does not come for free in Rabia. The entire protocol has more messages and rounds in the common case than Multi-Paxos or Raft, so it kind of shifts the complexity and cost of having a leader election once in a blue moon to having more code and more communication in the common case. I am not sure it is a good tradeoff. The performance claim of slow leader elections in Paxos/Raft is also shaky — failures in Rabia can derail the protocol of the fast path. I am not sure whether the impact of operating under failures is comparable with leader-election overheads upon leader failures, and I hope Rabia may have a point here, but the paper provides no evaluation for any kind of failure cases. 

And speaking of evaluations, this was the biggest disappointment for me. The authors claim Rabia compares in performance to Multi-Paxos and EPaxos in 3 and 5 nodes clusters, with 3-nodes in the same availability zones allowing Rabia to outperform EPaxos. In fact, the figure below shows that Rabia beats Multi-Paxos all the time. 

But there are a ton of assumptions and tweaking going on to get these results. For example, Rabia needs to have enough time to replicate client requests before starting the propose phase to have a good chance for completing on the fast path. So the testing is done with two types of batching to give the delay needed. The figure mentions the client batching. However, there is also a much more extensive server-side batching which is mentioned only in the text. Of course, there is nothing wrong with batching, and it is widely used in systems. For all the fairness, the paper provides a table with no batching results, where Multi-Paxos outperforms Rabia fivefold.

The biggest issue is the lack of testing under less-favorable conditions. No evaluation/testing under failures. No testing when the network is degraded and does not operate on the timing conditions expected by the protocol. These issues impact real performance and may create reliability issues. For example, a network degradation may cause Rabia to not use a fast path and consume more resources, reducing its maximum processing capacity. Such a scenario can act as a powerful trigger for a metastable failure

As usual, we had a nice presentation of the paper in the reading group. Karolis Petrauskas described the paper in great detail:


1) Evaluation. I have already talked about evaluation concerns, and this was one of the discussion topics I brought up during the meeting. 

2) Use of Ben-Or. Ben-Or is an elegant protocol to reach binary consensus, which is not usually useful for solving state machine replication. Traditionally, Multi-Paxos or Raft agree on a value/command and its sequence number, so they need a bit more than just a yes/no agreement. However, Rabia transforms the problem into a series of such yes/no agreements by removing replication and ordering from consensus and doing it apriori. With deterministic timestamp ordering of requests, Rabia just needs to wait for the operation to exist on all/most nodes and agree to commit it at the next sequence number. So the consensus is no longer reached on a value and order, but on whether to commit some command and some sequence number.

3) Practicality. The evaluation suggests that the approach can outperform Multi-Paxos and EPaxos, but whether it is practical remains to be seen. For one, it is important to see how the solution behaves under less ideal conditions. Second, it is also important to see how efficient it is in terms of resource consumption. EPaxos is not efficient despite being fast. The additional message exchanges over Multi-Paxos, Raft, and even EPaxos may cost Rabia on the efficiency side. 

4) Algorithms. The paper provides some nice algorithms the illustrate how the protocol works. However, some of the conditions are not necessarily confusing. In the same algorithm, the authors use f+1, n-f, and floor(n/2)+1 to designate the majority in an n=2f+1 cluster. Please proofread your algorithms — a bit of consistency can improve readability a lot!

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Exploiting Nil-Externality for Fast Replicated Storage

85th DistSys reading group meeting discussed “Exploiting Nil-Externality for Fast Replicated Storage” SOSP’21 paper by Aishwarya Ganesan, Ramnatthan Alagappan, Andrea C. Arpaci-Dusseau, and Remzi H. Arpaci-Dusseau. The paper uses an old trick of delaying the execution of some operations to improve the throughput while maintaining strong consistency. Consistency is an externally-observable property, and simple strategies, like batching, can improve the throughput by making all concurrent operations wait a bit and replicate/execute in bulk. Some other solutions may delay some operations in favor of others when conflicts arise. For instance, the PQR approach may delay reads to allow writes to finish first. This paper takes the “delay-some-reads” mechanism one step further to enable clients to directly replicate certain writes/updates to the nodes in the cluster for 1 RTT client observable latency. At the same time, the leader orders the operations in the background and potentially delays reads for objects that have pending un-ordered, un-executed writes. 

A core observation of the paper is that not all operations expose the system’s state upon acking their completion back to the clients. For example, a simple insert or replace command may only acknowledge its success without exposing any state. This behavior is normal, even expected, since we know what state a system should assume (at least for that particular object) after a successful write. The paper calls these operations nil-external. Since the nil-external command does not return any state, we do not really need to execute it right away, at least not on a critical path, as long as we can ensure that the command won’t be somehow forgotten after acking success to the client. The authors claim that nil-external operations are common in many production-grade systems, presenting an opportunity for improvement. 

Skyros, the system presented in the paper, breaks down replication and execution of nil-external commands. The clients write their nil-external operations directly to the replicas, with each replica placing the command in the pending log, called d-log. Obviously, different nodes may have the same operation in different slots in the d-log due to races with other concurrent clients. From the client’s perspective, once a super-majority of replicas, including a leader replica, has acked the command to the client, it can consider the nil-external operation completed. This is because the operation is replicated and won’t get lost even under failures. However, the command has not been executed yet; moreover, its execution order has not been set either.

In the “happy case,” the command order mismatch in d-logs between replicas is irrelevant. Skyros is a leader-based protocol, and the leader periodically tells the followers to commit the commands from d-log to a commit log, or c-log, in the leader’s order. This part of the protocol looks like a Multi-Paxos or Raft, with the leader essentially replicating only the order of operations and not the operations themselves. Upon committing to c-log, the nodes remove the commands from d-log. This process is illustrated in the figure, which I compiled/borrowed from the author’s presentation of the paper.

However, when a leader fails, then the system has no authority to order the operations from the d-log. Intuitively, before the leader’s failure, the leader was the only node to receive all client requests. Moreover, the quorum requirement guarantees the leader to receive each request before the operation is acked on the client-side. After the system picks a new leader, it needs to establish the order of d-log operations to commit, but it is not safe to go with the order the new leader has. The reason is that we have no guarantee that all the commands in the new leader’s d-log have been added there before the operation was acked on the client. Here is an example, let’s say some operation “a” has reached a leader and 3 other nodes in a 5 nodes cluster. The client will consider this operation completed. Now consider another client starting a new operation “b” after “a” has been successfully acked. The operation “b” gets to a leader and 3 other nodes, however, it is possible that a set “3 other nodes” for “b” is not the same as it was for “a.” There can be a node that participated in the quorum for “b,” but not in the quorum for “a.” So the leader (and other nodes in the intersection of quorums for “a” and “b”) will see the order as “a, b”, while the node excluded from quorum “a” may see the order as “b, a” if somehow message “a” was delayed long enough. So now, if a leader failed, and the node with “b, a” d-log gets elected as the leader, it will commit the order that does not respect the real-time order of operations, violating linearizability. 

Naturally, to be safe, Skyros needs to recover the real-time ordering. This is the reason for having a larger “supermajority” quorum. This trick has been pulled from Fast Paxos in several papers/protocols. Essentially, the supermajority quorum allows Skyros to recover the partial order of commands that have a happens-before relationship by simply looking at the majority of nodes in the majority quorum used for recovery. This majority of majority setup (i.e., 2 nodes out 3 nodes majority in a 5 nodes cluster) will have correct hb relations because of the quorum intersections. So, if “a” happened-before “b”, then this partial order will be captured in at least 2 nodes in any 3 nodes majority quorum in 5 nodes setup. 

The evaluation is a bit tricky here. When testing with all nil-ext operations, Skyros beats MultiPaxos by a wide margin both in terms of latency and throughput (left-most figure). However, such workload consists of only writes/updates, so it is not very realistic. Adding some nonnil-ext operations to the mix changes things a bit (second from the left), with Skyros degrading down to Paxos performance when all operations are nonnil-ext. Also interesting to note is that this comparison does not really agree with the nilext-only experiment. See, there batched Paxos was operating at a maximum throughput of over 100k Ops/s, but in the nonnilext experiment, it is doing only 40k Ops/s, allowing authors to claim 2x throughput advantage of Skyros. I have a feeling that this experiment was done at some fixed number of clients, and with Skyros having lower nil-ext write latency, the same number of clients can potentially push more operations. Three right-most figures show the performance with reads added to the mix. 

As usual, we had a presentation in out group:


1) Five nodes. The system is described with a 5-nodes example. It makes sense when we need a supermajority quorum. Three nodes clusters are common and tend to provide better performance for MultiPaxos/Raft systems. However, three nodes setup will require a quorum of all nodes as a supermajority, making the system non-tolerant to any machine slowdowns. 

2) Failures? When one node fails, this is not a bit problem. A follower failure is masked by the quorum, and the leader failure will require a new leader election, however, the cluster can continue fine after with just four nodes. Two nodes out of five failings will prevent any supermajority, so the system needs to have a mechanism to detect this efficiently and fall back to regular Multi-Paxos. Timeouts on every operation before fall-back is not a performant option, and without this discussed, the protocol hardly tolerates f nodes failing out of 2f+1 nodes cluster.

3) Orchestrating an overload? The read operations have the potential to cause foreground synchronization when a read tries to access an object with an operation still in the d-log. This means that a workload that writes and immediately reads the same object can cause too many of these transitions to the “foreground.” These explicit syncs not only impact the latency of the reads but is also likely to reduce the ordering batches and cost more processing resources, reducing the system’s maximum capacity. So, by a simple workload, it may be possible to overload the system possible into a failure. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!