All posts by alekseyc

Reading Group. Darwin: Scale-In Stream Processing

In the 99th reading group meeting, we discussed stream processing. The paper we read, “Darwin: Scale-In Stream Processing” by Lawrence Benson and Tilmann Rabl, argues that many stream processing systems are relatively inefficient in utilizing the hardware. These inefficiencies stem from the need to ingest large volumes of data to the requirement of durably storing ingested data for fault tolerance to the generality of the stream-processing frameworks themselves. Due to these inefficiencies, many stream processing systems scale by adding more machines instead of solving the existing bottlenecks. The authors propose a set of methods to address the scalability concerns. The paper refers to these fixes as a “scale-in” approach, where the system tries to optimize and fully use the resource it already has before requiring more servers. 

Overall, the paper identifies several areas of concern and improvement. On the framework generality side of things, the authors discuss query compilation. The idea here is that compiling streaming queries into native, optimized code will be faster and more efficient than having these queries run in a more general but interpreted environment. Out of all identified problems, Darwin seems to only solve this one. Darwin exposes an SQL-like interface to users, creates a query plan that incorporates various optimizations, translates the query plan into C++ code, and finally compiles C++. 

Other bottlenecks/issues the paper identifies are network communication, persistent storage, and recovery. For network communication bottlenecks, the authors propose using… well faster networks and RDMA. On the persistent storage side of things, the paper claims we need more speed and should consider newer technologies, like nonvolatile memory (PMem). Finally, for the recovery aspect, the paper is again not very creative and suggests using “modern storage technology to achieve efficient checkpointing.”


1) Limited Implementation & Eval. This is a short paper, so there are only so many details on many of the discussed topics. The paper identifies 4 bottlenecks and provides experimental justifications for these. However, on 3 out of the 4 issues, the solution seems to boil down to “using modern technology.” So, it seems like either 3/4 of the problems have been solved with “modern technology” or that the authors have no good solution to them. 

The limited evaluation is also interesting. After all, in RAM, Darwin shows the same performance as the other systems the authors compare against. This makes us question whether the remaining 1 problem and solution (query optimization) is also actually a problem in (at least some) other streaming systems.

2) Few Interesting Numbers. We really liked the motivation for scaling streaming systems the paper uses — Alibaba’s 2020 Singles Day (a big e-commerce sales event) produces 4 billion streamed events per second are required 1.5 million cores of Apache Flink to handle. At such a scale, improving the efficiency of streaming systems is very crucial. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Achieving High Throughput and Elasticity in a Larger-than-Memory Store

Achieving High Throughput and Elasticity in a Larger-than-Memory Store” paper by Chinmay Kulkarni, Badrish Chandramouli, and Ryan Stutsman discusses elastic, scalable distributed storage. The paper proposes Shadowfax, an extension to the FASTER single-node KV-store. The particular use case targeted by Shadowfax is the ingestion of large volumes of (streaming) data. The system does not appear to have replication and cannot handle server failures without data loss. However, the authors focus on reconfiguration performance and its impact on the rest of the system. So, in case of failures, replacement servers can be added with a minimal performance penalty. Under failure-free conditions, nodes can be added or removed without data loss. Finally, Shadowfax stores data in three different “places” based on how hot the keys are. Recently updated keys reside in RAM, colder data gets pushed to the SSD, and finally, the data from SSD is asynchronously copied to the shared cloud storage.

The main problem Shadowfax solves is ingesting lots of data from networked clients. A single server can only do so much, so we really need a system that can partition the keyspace across the servers. Shadowfax builds on top of FASTER, single-node storage that leverages a lock-free hash table to support key indexing/look-up. Shadowfax adds hash-based key partitioning to ensure each server is responsible for a subset of all keys. This partitioning is stored in ZooKeeper for fault tolerance of the sharding configuration. 

A lot of the Shadowfax design stems from two ideas: (1) avoiding coordination and context switching and (2) avoiding unnecessarily costly operations.

For the first point, Shadowfax servers pin processing threads to cores, allowing each thread to pick up some requests and dispatch them to the underlying FASTER store shared among all processing threads. As such, the processing threads never coordinate with each other by any other means except the FASTER store. The shared FASTER store uses lock-free hashing to minimize the impacts of concurrency. Whenever some coordination is needed, FASTER avoids explicit cross-thread coordination with the help of asynchronous cuts. As different threads may access the same shared data, it is often essential to know when all threads have passed or seen some particular version of the data. For instance, when some record moves from RAM to SSD, all threads must have completed their access to that record in RAM (or agreed not to access it) before the memory can be cleaned up. FASTER achieves this lazily via view or epoch changes. When a thread sees a new epoch and transition to it, it agrees to have finished accessing any data in the old epoch. When all threads have moved (at their own pace) to a new epoch, the system can reclaim the memory supporting the older epoch’s data.

On the client-side, threads pin to a core as well. Clients maintain sessions with the servers. Each client thread opens a session with each server, allowing independent client threads to talk to the same server without any internal coordination. When accessing the key, a client thread uses its session for the partition/server hosting the key and batches the request before sending it over to the server with other requests from the same session.

The scheme with sticky sessions works well when the partitions are static. However, in a dynamic situation, it may be possible that a client sends a request to a server that no longer serves a key. This is where the second aspect of the design comes in. The naive solution would be for a server to check ownership for each request, but this would also be a waste of CPU cycles for the vast majority of requests. Instead, Shadowfax checks request ownership at a batch level using view numbers and drops outdated batches. Each server has a view number representing its configuration version. Whenever there is a partitioning change impacting a server, its view increases. The client attaches the view obtained (and cached) from ZooKeeper to a batch, so if the client’s information is stale, its batches will not succeed. Naturally, when this happens, the client refreshes the keyspace mapping and server views from ZooKeeper

The migration tries to avoid as much coordination as possible. It leverages the idea of the asynchronous cut to transition between phases of the migration process — once all threads finish a migration phase, they can transition to the next phase. I will skip the detailed explanation of data migration, however, I will mention that it requires a few state transitions on both the source and target nodes, and during the process, some requests may get delayed, as the target machine may not have yet received the data for remapped/repartitioned keys. In short, the source must find the keys to send, tell the target to expect the data migration, and move the keys, while the target needs to accept the keys and eventually start serving them. 

Another extension of Shadowfax over the FASTER store is the ability to move data from SSD to shared cloud storage. The main benefit of using cloud storage is the speed of reconfiguration and migration. Shadowfax eventually writes all records from SSD to shared cloud storage. This cloud copy allows the migration procedure to avoid copying the records from SSD and instead migrate the index with pointers to the records in the shared cloud storage.

Regarding performance, Shadowfax seems to provide around 130 million ops/sec on one 64-core server. On 12 servers (and 3 clients) as shown in the figure here, the throughput can go as high as 930M ops/sec. Scale-up procedures also seem to be quick with little impact on performance. We can see a dip in source throughput, but it is relatively short and lasts way under a minute in all tested scenarios.


1) Fault tolerance? While it is impressive to get up to 130M ops/sec of throughput over TCP (albeit with FPGA network acceleration), I cannot stop wondering what kind of applications need such per-node throughput and do not need data redundancy? If we operate at such speeds, a server reboot can mean millions of lost operations that existed only in RAM. 

2) Client Scalability. Shadowfax clients communicate with servers over TCP, but the system assumes a small number of clients/sessions. In fact, the authors perform most evaluations with just one massive client. What happens is that a client needs a session to each server from each of its threads. So a 64-core client will have 64 sessions per server. I am not entirely sure if having this many sessions will impact the performance. The paper achieves 930M ops/sec in 12 server clusters with 3 clients, but each client is massive, and all three clients combined have 192 sessions on each server. The authors claim this result as proof that the servers can handle many client sessions. 

One question I have is whether having more smaller clients will impact the performance even if it is the same number of sessions? Referring to the theme of the first discussion point, what kind of application will have a single client producing 310 (930/3) million events/records/updates each second? Of course, an argument can be made that these clients are “proxies” for many real clients, like sensors, IoT devices, etc., but that will also mean that these proxy clients will not be able to provide 310M ops/sec, as they need to manage a lot of connections from actual data producers. So this, in turn, will require more proxy clients, hence more sessions. 

3) Benefits of cloud storage? Shadowfax uses shared cloud storage to copy data from SSD. However, this cloud storage does not seem to bring many benefits to the system, except for the data migration and recovery use case. Cloud storage also provides some limited protection from data loss when a server crashes and never reboots. But this protection only covers some data since “hot” keys in RAM are not written to SSD and consequently not copied to the cloud, leading to the cloud alone having incomplete or stale data. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Shard Manager: A Generic Shard Management Framework for Geo-distributed Applications

The 97th paper in the reading group was “Shard Manager: A Generic Shard Management Framework for Geo-distributed Applications.” This paper from Facebook talks about a sharding framework used in many of Facebook’s internal systems and applications.

Sharding is a standard way to provide horizontal scalability — systems can break down their data into (semi-) independent chunks and store and process operations on these data chunks on different servers. There are many complex questions about this seemingly simple concept. For instance, how do we split the data or keyspace into good chunks? And once we know how to partition the keyspace, where do we store the shards and perform compute tasks on them? How do applications find the right partitions for their data?

Despite the widespread use, many large systems handle sharding in an ad-hoc manner, implementing everything they need from scratch. Obviously, reinventing the bicycle each time can be problematic when you run a big company with hundreds of distributed sharded systems and applications. Shard Manager is Facebook’s solution to the sharding problem that aims to be general enough to be reused by many internal systems. It is a comprehensive framework that can decide how to split the data/keyspace (apps can override this and have their own sharding rules), where to place the resulting shards (app specify rules or constraints to guide the placement), and how to find them.

Shard Manager aims to be general and accommodate many workloads and deployment patterns observed at Facebook. The paper states that currently, 54% of all sharded applications at Facebook use the framework. Unlike other sharding frameworks that are constrained to a single data center or region, Shard Manager works across regional boundaries. In a traditional sharding framework, if an application needs a copy of a partition in a different region, it must orchestrate this copy’s placement in the regional instance of the sharding platform. This is a problem for availability, scalability, fault-tolerance, and cost, as components managing the shard placement and migration, are separated between different systems. With Shard Manager, the shard’s placement is not confined to a single region, and copies can move between the geographical regions, all controlled by one system. Speaking of shard copies, Shard Manager assumes replicated shards and also allows for a “special” primary shard to facilitate leader-based replication approaches.

Of course, all shard placements and movements are constrained based on the application requirements and load balancing needs. The applications set the constraints, some of which are hard and cannot be violated, and some are more like a wish list — good to have but not strictly necessary. The hard constraints ensure the minimum requirement for shard placement, such as the server capacity needed for a shard. Examples of soft constraints or goals are preferred geographical location, the spread of replicas across failure domains, and various load balancing requirements.

To make everything tick, Shard Manager consists of multiple components. The orchestrator component is a monitor for health and resource usage; it makes the decisions to change the placement of shards whenever needed. The allocator component creates new shard-to-server assignments. Allocator also receives input from Twine, Facebook’s cluster manager. This connection allows the orchestrator to make decisions based on planned or anticipated events known to Twine, such as servers shutting down or rebooting for maintenance. Apparently, this is a big deal, as it allows applications to gracefully handle infrastructure maintenance. The paper spends quite some time talking about graceful shard migration, especially for cases when a primary copy of a shard needs to move to a different server.

Finally, the ZooKeeper stores all the important stuff, such as the orchestrator’s state and shard assignments. ZooKeeper also acts as a failure detector for application nodes. On the application side, servers used Shard Manager (SM) Library to interact with the orchestrator and ZooKeeper. Application clients, on the other hand, interact with Service Discovery to find the required shards. We recently covered the Delos paper that talks about building control plane storage for Facebook to replace ZooKeeper, so it is likely that the actual ZooKeeper has been replaced with a Delos-based system.

The scale of Facebook’s operation demands the Shard Manager to be sharded itself. The system is composed of many Mini Shard Managers (Mini-SMs). Each Mini-SM handles multiple partitions, and each partition represents a slice of servers available to some application across many regions.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Solar Superstorms: Planning for an Internet Apocalypse

Our 96th reading group paper was very different from the topics we usually discuss. We talked about the “Solar Superstorms: Planning for an Internet Apocalypse” SIGCOMM’21 paper by Sangeetha Abdu Jyothi.

Now (May 2022), we are slowly approaching the peak of solar cycle 25 (still due in a few years?) as the number of observable sunspots grows. If you ask how the sunspots relate to the distributed systems, it turns out that there is a pretty significant connection. See, sunspots can produce coronal mass ejections (CMEs) of magnetized and charged particles flying out from the sun, and if our planet happens to be on a collision course with such CME, we may be up for a nasty surprise. When a CME hits the Earth, we experience a “solar storm.” Earth’s magnetic field protects living organisms from the negative impacts of these charged particles. However, the interaction between the particles and the magnetic field can produce current in large/long pieces of conductive materials on the planet. These conductors act as primitive electric generators in the fluctuating magnetic field of a solar storm.

It happens to be the case that people rely on long stretched out conductors all over the Earth — our electric grid. Substantial research has been done on the resilience of the electric grid in the face of solar storms since the currents produced in wires spanning many hundreds of miles can be substantially high to destroy all kinds of electrical equipment and appliances. Our modern communication networks rely on fiber-optic cables, which are nonconductive. However, fiber-optic cables need repeaters to boost the signal along the way; these repeaters need power, so any substantially long fiber-optics link also has a piece of wire to deliver power to the repeaters. This is a big problem for the internet — current induced by the solarstorm in these wires can destroy repeaters, rendering the entire fiber-optic cable non-operative. This paper studies how resilient our internet infrastructure (mainly the cables/links that make up the backbone of the internet) is in the face of solar superstorms.

Solar superstorms are solar storms of very intensity. These events are believed to cause significant damage to power and communications infrastructure. Humanity has lived through them just fine in the past, but there have not been any solar superstorms recently enough to stress our grids and especially modern communication networks. The two most recent large events occurred in 1859 and 1921. In both cases, these solar storms caused telegraph service to go down, and the 1921 storm also brought down the only operational undersea communication link of the time. Technology has advanced a lot in the hundred years since the last event, and we ought to know whether the communication infrastructure can survive. An important point to mention from the paper is that the likelihood of a solar superstorm happening is far from negligible and roughly compare to a “hundred-year flood.”

The paper is not very optimistic about the fate of the internet in the case of a solar superstorm. A significant amount of internet infrastructure resides in the northern hemisphere above 40°N latitude. This happens to be a problem, as the currents induced in conductors increase as we get closer to the poles (this is also a reason we see auroras in Alaska and not Hawaii). The long under-sea cables are especially susceptible. It turns out that out of links/cables connecting the US and Europe, all but one happen to be in the more vulnerable region above 40°N. Another interesting fact is the disparity between internet infrastructure in vulnerable latitudes and the population — we have way a lot of infrastructure there and not as many people. The paper points out Facebook’s infrastructure here as well, as its infrastructure is predominantly located in the northern parts of the northern hemisphere.

Anyway, the paper suggests building redundant links that connect countries and continents in regions below 40°N. Another suggestion is the isolation of links that are in vulnerable regions from other cables that are more south. For example, if an undersea cable induces current due to a superstorm, it is important to ensure that current does not “jump” (at the interconnection point) to another, otherwise unimpacted, cable further south. Another suggestion is powering down the links when the solar storm is coming. The power-off mode does not stop currents from appearing in the conductors, but it may reduce the damage or failure rate and leave more equipment operational after the storm has passed. The paper has a few more suggestions and a few simulations to try different scenarios of link/cable failures, but such simulations are very speculative since we do not know what the failure rate can be in different cables and locations on the globe.

New Reading List: Papers #101-110

Summer term papers are here! The list is bellow. Also, here is a Google Calendar.

Reading Group. ByShard: Sharding in a Byzantine Environment

Our 93rd paper in the reading group was “ByShard: Sharding in a Byzantine Environment” by Jelle Hellings, Mohammad Sadoghi. This VLDB’21 paper talks about sharded byzantine systems and proposes an approach that can implement 18 different multi-shard transaction algorithms. More specifically, the paper discusses two-phase commit (2PC) and two-phase locking (2PL) in a byzantine environment.

As usual, we had a presentation of the paper. Karolis Petrauskas did an excellent job explaining this work: 

The paper states that modern blockchain technology relies on full replication, making the systems slower and harder to scale. 

Sharding is a natural way to solve the problem and has been done countless times in the crash fault tolerance setting. Of course, a sharded system often needs to perform transactions that touch data in more than one shard. The usual way to solve this in CFT is to use some version of 2PC coupled with some concurrency control mechanism, like 2PL. ByShard follows this general recipe, but in the BFT setting, which complicates things a bit. The problem is making 2PC and 2PL work in a byzantine, adversarial environment without tightly coupling all shards back together into one “megashard.” So, we need a way to communicate between shards in a reliable way. 

Let’s look at a transaction lifecycle. When we have a transaction that spans multiple shards, the first step is to deliver this transaction to each shard and check whether a shard can run it. Then, if everything is ok, we need to run the transaction and provide some isolation from other ongoing transactions. ByShard implements all these actions with shard-steps. Each shard-step is a building block of all ByShard protocols and allows the shard to inspect a transaction, make changes to the local state, and send the message to start another shard-step on another shard. Overall, ByShard uses three distinct types of shard-steps: vote-step, commit-step, and abort-step. 

The message sending part is kind of important, as we need this communication to be reliable in the BFT setting. The paper gracefully ignores this problem and points to a few solutions in the literature. In short, ByShard requires a cluster-sending protocol that ensures reliable communication between shards, such that, all correct nodes of the receiver shard get the message, all the correct nodes of the sender shard get an acknowledgment, and that sending requires the sender shard to reach an agreement on what to send. The last point ensures that bad actors do not send malicious stuff, and I suspect on the receiver there needs to be a way to check that the received messages were indeed certified by the sender’s consensus. 

Vote-step is used to replicate the transaction between shards. When a shard receives the transaction, it starts the vote-step and checks whether it can proceed with the transaction. The shard may also perform local state changes if needed. At the end of the vote-step, a shard forwards some information to another shard to start a new shard-step. Since we only have three building blocks, the stuff vote-step sends can start another vote-step, commit-step, or abort-step at the receiving end. The purpose of commit-step and abort-step is self-evident from their name. One important thing to note on abort-step is that it needs to undo any local changes that a prior vote-step might have done to ensure that the aborted transaction leaves no side effects. 

Now we can look at how ByShard composes these three basics steps. The figure above provides a visual illustration of three different ways ByShard runs 2PC. One aspect of the figure that I fail to understand is why not all shards run vote-step and commit-step, and the text does not really provide an explanation.

In the linear orchestration, the transaction progresses from the coordinator one shard at a time. If any shard decides to abort, it needs to start the abort-step and notify all other shards involved in the transaction (or at least all other shards that voted earlier). If a shard decides to commit, it actually starts a vote-step in the next shard. If the vote-step successfully reaches and passes the last shard involved in the transaction, then that last shard can broadcast the commit-step to everyone. Centralized orchestration looks more like the traditional 2PC, and distributed orchestration cuts down on the number of sequential steps even further. The three strategies represent tradeoffs between the latency and number of communication exchanges and shard-steps. 

So with 2PC taken care of, we can briefly discuss the concurrency control. ByShard proposes a few different ways to implement it, starting with no concurrency control, thus allowing observation of partial results. Because of the side effect cleaning ability of abort-step, if some transaction partly executes and then reaches the abort-step, then its execution will be undone or rolled back. This reminds me of the sagas pattern. The other solution is to use locks to control isolation. The paper (or the presentation above) has more details on the nuances of locking and requiring different locks with a different type of orchestration. By combining different ways to orchestrate the transactions with different ways to execute them, ByShard presents 18 BFT transactional protocols with different isolation and consistency properties. 


1) Comparison with Basil. An obvious discussion is a comparison with Basil, another transactional sharded BFT system. Basil is a lot closer to the Meerkat solution from the CFT world, while ByShard is a more classical 2PC+2PL approach. In Basil, the degree of fault of tolerance is smaller (i.e, it needs 5f+1 clusters). At the same time, ByShard is a lot underspecified compared to Basil. ByShard relies on existing BFT consensus and BFT cluster-broadcast mechanisms to work, while Basil provides a complete and contained solution. On the performance side of things, ByShard requires a lot of steps and a lot of consensus operations across all the involved shards to do all of the shard-steps. This must have a significant performance toll. While it is not correct to compare numbers straight between papers, Basil can run thousands of transactions per second, while ByShard’s throughput is in single digits. However, it is worth mentioning that ByShard’s experiments included more shards; ByShard’s transactions also involved large number of shards. 

2) (Distributed) Sagas Pattern. As I briefly mentioned in the summary above, ByShard, especially with linear orchestration and no isolation reminds me of Sagas patterns. Distributed sagas are used to orchestrate long-running requests in microservice-type applications. If we squint our eyes, we can see each shard as a separate microservice. As vote-steps propagate across shards, they perform local changes. And if an abort is needed, the abort causes these changes to be rolled back. However, when we add isolation to ByShard, the similarities with sagas start to disappear. 

3) Performance. An important motivation for sharding is performance, however, it does not seem like ByShard achieves stellar performance here. Of course, sharding is still useful for systems that operate with large amounts of data that otherwise would not fit into a single machine. Nevertheless, without strong performance, a solution like this has very few advantages over not using sharded/partitioned systems at all. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. CompuCache: Remote Computable Caching using Spot VMs

In the 92nd reading group meeting, we have covered “CompuCache: Remote Computable Caching using Spot VMs” CIDR’22 paper by Qizhen Zhang, Philip A. Bernstein, Daniel S. Berger, Badrish Chandramouli, Vincent Liu, and Boon Thau Loo. 

Cloud efficiency seems to be a popular topic recently. A handful of solutions try to improve the efficiency of the cloud by ratcheting up the utilization of already provisioned hardware and infrastructure. These approaches tend to provide some service using otherwise wasted resources. For instance, we have covered a paper that suggests running serverless computing on harvested VMs that use resources normally wasted/idle in the data center. CompuCache provides a similar utility, but instead of running a pure serverless compute infrastructure, it suggests using Spot VMs to run a compute-oriented caching service. See, running a purely caching service is more memory intensive, leaving CPUs still relatively underutilized, defeating our goal of using otherwise wasted resources well. So CompuCache is a caching service that can execute stored procedures against the cached data. This is the “Compu” part, and I suppose it enables a better CPU utilization of the spot VMs the service is running. 

So, now let’s talk about using spot VMs for storage systems. Spot instances have unpredictable lifespans controlled by the cloud vendor and not the users/engineers. When the resources used by a spot instance are needed to support other services, the cloud vendor evicts the VM with little notice. This fickle nature of spot VMs precludes them from reliably supporting a storage architecture. However, caching may be a somewhat viable option since the master copy of the data is stored on some durable service. Of course, we still want to avoid cache misses as much as possible for performance reasons, so CompuCache needs to have mechanisms to deal with spot VM evictions. 

CompuCache architecture relies on the Cloud VM Allocator component to get the resources to run multiple caches for different applications. The allocator allows client applications to specify the size of cache they need and allocate the resources. The system assigns memory in fixed chunks (1 GB), and these chunks can go to different spot VMs that have the resources. Naturally, if no existing VMs have the resources, the allocator will try to get more VMs running. The clients “see” the allocated memory as an address space, and can write and/or read from memory knowing some address and length of data. 

The sharded nature of the cache means that the clients somehow need to know where their data may be in the system. CompuCache uses a scheme that maps virtual global cache address space to the specific nodes and their local address spaces. The client application constructs and maintains this map based on the results of cache allocation through the Cloud VM Allocator. Moreover, the client application is responsible for distributing this map to the CompuCache instances. When the node composition of the cache changes due to VM evictions or new VM additions, so is the mapping of virtual memory space to instances. As the system evolves, each client application must migrate parts of its cache between VMs. The paper has more details on the procedures involved in data migration due to spot VM deallocation/destruction.

An interesting part of CompuCache is its compute-oriented API. The clients can read and write data directly from the cache, but they can also run stored procedures (sprocs). I suspect the stored procedures allow CompuCache to better utilize unused cores that go into the spot instances. I was under the impression that stored procedures are not a very popular tool, but I may be mistaken here. Anyway, of course, the authors provide another explanation for using stored procedures to interact with ComputCache — latency. See, if we need to read something and then update something else based on the retrieved value using just puts and gets, we will pay the price of 2 RTTs (not to mention screwed-up consistency). Using stored procedures, we can achieve this read-then-update pattern in just a single sproc call and 1 RTT between the client and cache. This is not a new idea, and some systems have been suggesting sprocs for atomic access to multiple keys and latency benefits. But here is a caveat, CompuCache is sharded and shards are relatively fine-grained, so the data you access in stored procedures may be on multiple VMs. This not only negates the latency benefit but also complicates CompuCache. To deal with data on multiple VMs, the system can either retrieve the data to a machine running the sproc or move the computation itself to the next VM. 

For the evaluation, the authors compared CompuCache with Redis, and it seems to outperform Redis by a large margin. This is interesting and unexpected, as there is nothing about using spot instances that should be beneficial for performance. Quite the opposite, the VM churn and reclamation should make CompuCache more likely to experience cache misses and spend more time migrating data and rebuilding caches. One explanation here is the poor performance of Redis sprocs implementation used in the evaluation, which seems to be at least two orders of magnitude slower than CompuCache. There is no word on what CompuCache uses for its sprocs to make it so much faster.


1) Application-driven design. CompuCache places a lot of burden on the application that needs the cache. It seems like the “memory management” done by the application can be rather heavy. Any data migration requires remapping of the virtual address space to nodes. Consequently, clients then need to redistribute this map so that nodes themselves can find the data they may need to run sprocs. This can make the system brittle due to the application’s mismanagement of the cache. 

2) API. An interesting thing to note is the API. ComputCache allocates memory to applications as some chunk of bytes. This chunk of bytes represents the virtual memory space that is mapped to multiple nodes/VMs. The API allows to read and write at some byte offset and length. So, there is no KV abstraction common in other caches, and in fact, the application needs to implement KV API if needed.

3) Sprocs vs. Serverless functions? One major point of discussion was comparison with prior work on serverless functions using harvested resources. The compute part of CompuCache looks a lot like a serverless function meant to operate on state directly from the cache. I’d suspect the sproc should also have the ability to find data in storage in case of a cache miss, but paper leaves any details on cache misses. Anyway, it seems that a sproc is a bit more restrictive than a general serverless function. So the question is whether you can run harvest VM serverless infrastructure capable of full-fledged function runtime and use it instead of CompuCache sprocs? Of course, you lose that nice benefit of collocating data and compute, but this benefit is elusive at best when a cache is sharded into many VMs anyway. The problem here is the map of virtual space to nodes — application knows it and maintains it, but a separate serverless compute does not know it, so it won’t have a clue what nodes to contact for data. So we are back to the point above — relying on the client application to manage cache can be brittle and restrictive. 

4) Sproc performance. The performance evaluation is on the weaker side of the paper, mainly because it leaves many things underspecified. Why does Redis sproc is so much slower? How was the comparison done given the difference in cache APIs? What was the implementation of CompuCache? 

5) Failures. Finally, it is hard to discuss any distributed systems without talking about failures. CompuCache leaves a lot of details on handling failures out of the paper. It is not clear what sprocs will do in case of a cache miss. In fact, it is even unclear what a cache miss is in CompuCache, as it does not store objects and provides some memory space for the application instead. It seems like this is all up to the application — if some memory is unreachable, the application should know how to find that data in a reliable store. This, of course, leaves sprocs vulnerable to failures. A big part of the paper’s fault tolerance discussion is data migration due to Spot VM eviction. In the real world, however, servers may fail without notice. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Using Lightweight Formal Methods to Validate a Key-Value Storage Node in Amazon S3

For the 90th reading group paper, we did “Using Lightweight Formal Methods to Validate a Key-Value Storage Node in Amazon S3” by James Bornholt, Rajeev Joshi, Vytautas Astrauskas, Brendan Cully, Bernhard Kragl, Seth Markle, Kyle Sauri, Drew Schleit, Grant Slatton, Serdar Tasiran, Jacob Van Geffen, Andrew Warfield. As usual, we have a video:

Andrey Satarin did an excellent write-up on this paper, so I will be short here. This paper discusses the use of formal methods in designing, developing, and maintaining a new key-value storage subsystem in Amazon S3. In my opinion, the core idea and contribution of the paper is establishing engineering processes that ensure that implementation adheres to the specification. 

See, using traditional formal methods is hard in practice. For example, using TLA+ to model the system creates a model that is detached from implementation. If something in the system changes, both need to be updated. Moreover, updating TLA+ would require experts in TLA+, creating a possibility of model and code diverging over time. Another problem is using the model to check the implementation. Since the two are detached from each other, this becomes rather difficult. 

The paper proposes lightweight formal methods that can be integrated directly into the testing/build framework. The approach requires implementing a reference model for each component that captures the semantics/behavior of the component without any real-world concerns. For example, the model for LSM-tree is a hashmap — both have the same behavior and allow reads and writes of some key-value pairs. 

Engineers use these references models n a couple of ways. First of all, the models act as mocks for unit tests. This increases the utility of the models and forces the engineers to keep them up to date. But of course, the main purpose of the models is to confirm the behavior of the real implementations. The gist of the conformance checking is verifying that the implementation’s behavior is allowed by the model. To do so, the same test, consisting of some sequence of operations, runs against the implementation and the model. The expectation is that both go through the same states and deliver the same outputs. Below I have an image borrowed from the author’s slides that illustrate the process. 

Conformance checking. Same operations run both in model and implementation, expecting the same result in both. This can include running some background operations as well, such as garbage collection (GC) that have no impact on the model but may affect the implementation.

Of course, there are some challenges with running these tests. For instance, it is important to generate interesting scenarios/sequences of operations to test various behaviors. Another challenge is introducing failures into testing. And finally, sequential execution of some operations does not provide comprehensive coverage in modern, multi-threaded, or distributed code, requiring some concurrent testing as well. The paper talks more about these different scenarios in greater detail. 


1) Reliance on experts in formal methods. A big point made by the paper is about not having to rely on formal experts to maintain the models and verification. The paper says that while initially all models were written by experts, at the time of writing the paper, about 18% of the models were written by non-experts. To us, this sounded both as a big and small number. It is important to allow engineers to maintain the models and conformance checking framework, and the number clearly shows that the core engineers are getting onboard with the processes involved. At the same time, it is not clear whether a team can completely get rid of expert support. 

2) Importance of processes. As we discussed the reliance on experts and reducing this reliance, it became clear that a big contribution of this paper is about the importance of engineering processes. And it is not just about having some processes/workflows to facilitate formal methods adoption. What is crucial is making these processes scale and thus not require significant additional effort from the engineers. For example, developing models to support formal methods is an extra effort. Using these models as mock components for unit tests amortizes such extra effort into almost no additional work. After all, we need to do unit testing anyway, and using mocks is a common practice.

3) TLA+? It is hard to discuss any formal methods paper in our reading group without having some discussion on TLA+. We have talked about the difficulty of keeping the models up to do date with the implementation. Using TLA+ does not seem to allow for a low-effort mechanism — there is a big overhead to having engineering processes/practices that keep the TLA model and implementation coherent.

4) Testing Reading List. Our presenter, Andrey, has compiled an extensive reading list on testing distributed systems. It is most definitely worth checking out. 

5) Codebase size. The authors talk about using these lightweight formal methods on the codebase of about 40,000 lines of code. This is not that much code, to be honest. In fact, this is less code than a handful of my academic projects, not to mention the real software that I have worked on. So it would be interesting to see how these approaches can scale to bigger codebases and bigger teams with more people.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Basil: Breaking up BFT with ACID (transactions)

Our 89th paper in the reading group was “Basil: Breaking up BFT with ACID (transactions)” from SOSP’21 by Florian Suri-Payer, Matthew Burke, Zheng Wang, Yunhao Zhang, Lorenzo Alvisi, and Natacha Crooks. I will make this summary short. We had a quick and improvised presentation as well. Unfortunately, this time around, it was not recorded. 

The system presented in the paper, called Basil, proposes a sharded BFT transactional system. Basil is leaderless too, and overall reminds me of the Tapir/Meerkat line of work turned into the BFT setting. In these systems, clients do a bulk of the work, which definitely complicates things for BFT protocols — now we no longer have “dumb” clients who can be byzantine and instead rely on smart clients who can screw stuff up quite substantially. 

The paper lists a few common assumptions for their system, such as the inability of Byzantine clients to cause a denial of service attack and having some degree of fault tolerance f in each shard with a shard of 5f+1 nodes. Also important are the definitions of byzantine isolation levels. In short, the paper aims to provide serializability, but only for correct clients. So byzantine participants cannot cause the reorder of operations observed by the well-behaved clients. 

As I mentioned above, the actual protocol is very similar to Tapir/Meerkat. The clients run interactive transactions by reading from the involved partitions and buffering the writes locally. Once ready to commit, the clients run a two-phase commit protocol. Just like the CFT counterparts, the transactions are timestamp-ordered by clients, so clients must have synchronized time. Since the clients pick the timestamps, a faulty client can choose one far in the future, causing other dependent transactions to abort/hang. To avoid this, Basil replicas have a window of time in which they accept transactions. Replicas reject any transaction started by the client with a timestamp that is too much off from the replica’s time. 

Another BFT issue that arises with running a two-phase commit is preparing a transaction with too many keys and not committing it. Similar to the timestamp problem, this can block any other dependent transactions. To mitigate the issue, the Basil protocol allows other clients to take over the stalled transactions. Such a takeover also solves client crash failure.

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit. 

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT. 

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine. 

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit. 

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT. 

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine. 

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit. 

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT. 

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine. 


1) Low fault-tolerance. Requiring six nodes to tolerate one failure (5f+1 configuration) is a rather fault-tolerance threshold. To double the fault tolerance, we need 11 replicas! For comparison, the HotStuff protocol, which authors use as a baseline, needs a cluster of size 3f+1. Requiring more nodes also raise some questions about efficiency — while the performance is good, the protocol also needs more resource to achieve it. 

2) More fault-tolerance? In a few places in the paper, it is mentioned that up to votes can be missed due to asynchrony, and another f due to byzantine behavior: “A unanimous vote ensures that, since correct replicas never change their vote, any client C′ that were to step in for C would be guaranteed to observe at least a Commit Quorum of 3f + 1 Commit votes; C′ may miss at most f votes because of asynchrony, and at most f more may come from equivocating Byzantine replicas.” This suggests that the practical fault tolerance may be better than just f. 

3) Unanimous fast quorum. The unanimous fast quorum is another potential problem for performance when things are not going well. The sole faulty client will throw the protocol off the fast-path prepares, requiring more resources to prepare each transaction. Not to mention, waiting for a timeout on a faulty replica does not improve the latency. 

4) Questions about recovery. We had some questions about the recovery procedure. It seems like the first step is to try to recover by reading the recorded prepare vote, and if everything is good, simply finish the commit for the transaction. However, it appears that durably recording votes in one place is an optional stage: “If some shards are in the slow set, however, C needs to take an additional step to make its tentative 2PC decision durable in a second phase (ST2).” As a result, under normal conditions, there may not be votes from ST2 to recover from one shard/partition. Does the recovering client then need to contact all partitions of a transaction? 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. EPaxos Revisited.

In the 88th DistSys meeting, we discussed the “EPaxos Revisited” NSDI’21 paper by Sarah Tollman, Seo Jin Park, and John Ousterhout. We had an improvised presentation, as I had to step in and talk about the paper on short notice without much preparation. 

EPaxos Background

Today’s paper re-evaluates one very popular in academia state machine replication (SMR) protocol — EPaxos. EPaxos solves a generalized consensus problem and avoids ordering all state machine operations. Instead of a single total order of everything, EPaxos relies on a partial order of dependent operations. For instance, if two commands are updating the same object, then these commands may depend on each other and must be ordered with respect to each other. These dependant operations can happen concurrently, in which case they are conflicting. Not all operations depend on each other or have conflicts. For instance, if operations are commutative, and let’s say, work on different objects, requiring an order between them is overkill. Avoiding ordering operations that do not care about the order can enable some parallelism. 

In practice, things are a bit more complicated. The first problem is figuring out if operations depend on each other and conflict. Of course, one can funnel all commands/operations through a single node, like Raft or Multi-Paxos, and have that node figure out which operations conflict and which do not. However, our goal was to improve parallelism, and with a single node to handle all commands, we are not achieving it, at least not on the scale possible in distributed systems. So, we need different nodes to start operations independently and learn if these operations may conflict with each other. EPaxos does this with its leaderless approach.

Every node in EPaxos can receive a request from a client and replicate it in the cluster, learning of dependencies and possible conflicts in the process. We can call the node driving replication of a command a coordinator. If a quorum of nodes agrees on dependencies for the command, the protocol commits in one round trip. Such single RTT replication is called a fast path, and in EPaxos it requires a fast quorum that can be bigger than the traditional majority quorum in some deployments. However, if nodes disagree about dependencies, there is a conflict, and the protocol may take a slow path. Slow path essentially uses Paxos to replicate and commit the operation with a “merged” list of conflicting dependencies to other nodes. It ensures that conflicts that may have been observed by only a minority of nodes are recorded by a quorum for fault tolerance. 

Let’s look at this a bit deeper. The two commands ‘C1’ and ‘C2’ conflict when some nodes have received them in a different order. As such, one node may say ‘C1’ comes before ‘C2,’ and the other node orders ‘C2’ before ‘C1.’ As such, we express conflicts by a mismatch of dependencies at different nodes. In this example, the mismatch is ‘C1’ depends on ‘C2’ vs. ‘C2’ depends on ‘C1.’ EPaxos needs to ensure that enough nodes know about all the dependencies happening, so a slow path is taken to commit the command with all the learned dependencies. It is important to understand that not all dependencies are conflicts. For example, if operation ‘C4’ happened so much after operation ‘C3,’ that all (or really a quorum of nodes) nodes see that ‘C3’ happened before ‘C4’ in their dependencies, then this is not a conflict, and we can proceed with the fast path. 

Now let’s take a look at a few examples. The figure above, which I borrowed from the revised paper, has three distinct operation examples. In example-1, node ‘A’ is the coordinator for ‘x=5’ operation, and it knows of no dependencies for the object. It reaches out to a quorum, and the followers also know of no dependencies. Coordinator ‘A’ concludes the operation in the fast path. The same would have happened if the dependencies known to ‘A’ matched the followers — since this is a no-conflict situation, ‘A’ can commit in the fast path. In example-2, node ‘E’ knows of no dependencies for ‘x=9’ command, but one node in the quorum has recorded a dependency in the form of other operation on object ‘x’. This dependency mismatch between replicas is a conflict, and ‘E’ takes the slow path to record the combined observation. Note that initially, only replicas ‘A-C’ had the dependency, but after the Accept message by ‘E’, both ‘D’ and ‘E’ also have it. Performing the slow path ensures that at least a quorum of nodes knows the dependency. Finally, example-3 has node ‘A’ as the coordinator again. It knows a previous command as a dependency, but the followers know more additional dependencies. Even though there seems to be a conflict, all the followers agree on dependencies. In this case, the protocol can proceed in the fast path in some cases. The exact conditions that determine whether node ‘A’ can take that fast path depend on how ‘A’ communicates with the rest of the replicas. I will discuss this later. 

There are a few caveats to the process. First of all, when the coordinator learns of dependencies, it must learn the whole chain of dependencies to make sure the new command is properly ordered and executed at the proper time. So, over time the dependency lists get larger and larger. At some point, it will make no sense to remember old commands that were executed a long time ago on all nodes as conflicts, so the conflict and dependency tracking system must have a way to prune the no longer needed stuff. Another issue is the need to wait for all dependencies to execute before executing the new operation. As a result, the acts of committing an operation and applying it to the state machine can be quite far apart in time. 

One detail missing in my simplified explanation so far is the need to construct dependency graphs for the execution of operations. See, the fast path and slow path modes are all about committing the operations and their dependencies. We have not really ordered the operations yet, and the dependency graph will help us establish the actual execution order. Each operation is a vertex in a graph, and dependencies between operations are the edges. So to execute an operation, we first add it to the graph. Then we add the dependent operations and connect the operations and their dependents. Then for each new vertex, we also look at its dependents and add them. The process continues until there is nothing more to add to the graph. In the resultant dependency graph, concurrent operations may appear to depend on each other, creating loops or cycles. The execution order is then determined by finding all strongly connected components (SCCs), which represent individual operations, conflict loops, or even groups of loops, sorting these SCCs in inverse topological order, and executing SCCs in that order. When executing each strongly connected component, we deterministically establish an operation order for commands in the SCC based on a sequence number and execute all non-executed operations in the increasing sequence number. A sequence number is maintained to break ties and order operations in the same SCC. The paper has more details about the sequence number. As you can imagine, dealing with the graph is quite expensive, so it is even more important to prune all old dependencies out. 

Finally, one of the biggest caveats to this fast path leaderless process is typical in distributed systems. Of course, I am talking about handling failures. In particular, the protocol needs to ensure that failures do not erase the fast-path decision. It is tricky due to concurrency, as two or more coordinators may run conflicting operations. Having such concurrency is like attempting to place two different commands in the same execution slot or position. If we use a majority decision to see which command was successful in that particular position, then we risk creating an ambiguous situation even when just one node fails. For instance, if a command ‘C1’ has three votes, and ‘C2’ has two in a cluster of five nodes, then a failure of one ‘C1’ vote creates ambiguity, as both ‘C1’ and ‘C2’ have the same number of remaining nodes. Fast Paxos paper describes the problem well, and the solution is to use a bigger fast quorum to avoid such ambiguity. 

A larger fast path quorum is the case in EPaxos protocol. However, the original paper proposes some optimizations to allow a majority fast quorum in configurations of three and five nodes (technically, it is an \(f+\lfloor\frac{f+1}{2}\rfloor\) fast quorum). Of course, these optimizations do not come entirely for free, as the need to solve the ambiguity problem remains. The optimized recovery needs only \(\lfloor\frac{f+1}{2}\rfloor\) responses and some additional information about the dependencies available at the nodes. The whole set of optimizations is hard to summarize, so I refer interested readers to the paper, Section 4.4

Now let me return to the example-3 fast path again. Previously I said that the quorum needs to agree on dependencies, but also showed example-3. In that example, the coordinator disagrees with followers. However, the coordinator still takes a fast path as long as all the followers in the quorum agree. Unfortunately, this is not the case in optimized EPaxos, at least not always. In fact, the paper says that the followers must have the same dependencies as the coordinator for the fast path if the coordinator communicates with all nodes and not just an exact quorum of nodes. Moreover, the followers must record having matching dependencies with the coordinator. So, example-3 works when the coordinator only contacts enough nodes to form a quorum. Intuitively, succeeding in this configuration means that not only enough nodes matched the dependencies, but all other nodes do not even learn about the operation from the coordinator. Upon recovery, this restricted communication gives additional information, as the recovery protocol can know what nodes were part of the initial fast quorum. In practice, however, this creates problems, as it is harder to get a fast quorum to agree, as the coordinator needs to be lucky to pick exact nodes that agree on the dependencies. Moreover, just one node failure results in failed fast quorum and forces retry, further contributing to tail latency. 

As such, practical implementations broadcast messages to all nodes and expect the fastest to reply to form the quorum. In the broadcast-to-all communication, example-3 in the figure will not work. 

EPaxos Revisited

The revisited paper brings a few important contributions to EPaxos. First, it conducts a re-evaluation of the protocol and makes an excellent comparison to the Multi-Paxos baseline. I really like the re-evaluation part of this paper! 

Unlike the original EPaxos paper, the revisited paper reports performance for a much bigger range of possible conflicts. The paper uses a more realistic read/write workload with different read-to-write ratios. The authors also change the conflict workload to a more realistic scenario. In this original paper, conflicts were simulated by having concurrent commands access the same predefined key — a “hot spot” method. The revisited paper no longer focuses on just one key and, instead, simulates conflicts by using Zipfian distribution to select keys. Finally, the reporting of results is way more visual. The figures capture the performance differences between EPaxos and MultiPaxos in every region and tested scenario. I was surprised to see very bad EPaxos tail-latency, even given the expectations of it having more performance variability. Below is the evaluation figure for latency in a five regions deployment. It may appear confusing at first, but just reading the caption and staring at it for a minute was all I needed to understand it.

Another important part of re-evaluation is addressing the commit vs. execute latency problem. As mentioned in the summary above, execution can delay significantly after the commitment. Yet, the original EPaxos measured only commit latency. The figure below shows the difference.

The second contribution is a bit shaky. EPaxos performance degrades as the conflict rate rises. Naturally, finding a way to reduce conflicts can help avoid the expensive slow-path, which, in turn, will improve both latency and throughput. The paper proposes a technique they call Timestamped-Ordered Queuing (TOQ) to reduce conflicts. Recall that the EPaxos coordinator sends its dependencies to the followers, and the followers reply with their updated dependencies. Whether the algorithm takes a fast path depends on a few conditions. The basic rule is that all followers in the quorum must have the same dependencies. TOQ exploits this and adds some delays at each follower to increase the chance of followers having the same conflicts/dependencies. For concurrent conflicting operations, quorum replicas may receive these operations in a different order. These out-of-order messages require a slow path to resolve. But if followers wait a bit to receive these concurrent messages and order them deterministically, then we can have the same dependencies/conflicts at follower replicas in the quorum. 

This waiting does not fix the conflicts coordinator communicated with the followers. And this is where my problem with the revisited paper lies. The TOQ optimization relies on the coordinator’s ability to communicate a different list of dependencies than the followers and proceed with the fast path, as in example-3. However, this only happens when we use unoptimized EPaxos with a larger fast quorum or make the coordinator use “thrifty” mode by contacting only a quorum of nodes instead of all nodes. In both cases, we lose something — larger quorums increase latency and decrease the number of failures a system can sustain before it is required to take a slow path all the time. And thrifty optimization can increase tail latency substantially, as a single failed node may prevent the coordinator from having the quorum of votes, requiring a timeout and retry. Considering that delaying the followers’ reply already adds extra latency, the TOQ optimization becomes a bit questionable in practice. 

The paper studies the added effect of delaying the followers on latency and proposes some compromise solutions to reduce the delay at the expense of having some conflicts. On the evaluation side, the paper says it has not enabled thrifty optimization, so for TOQ to be correct, it must have used a larger fast quorum. However, the paper does not mention this, so it is impossible to know whether the authors performed the TOQ evaluation on the correct system configuration.


1) Correctness. In my presentation, I focused a lot on the correctness of example-3, where the coordinator communicates a different dependency list than the followers have in an optimized EPaxos fast quorum. I said that this is incorrect, according to the original paper. The lack of preparation for the meeting showed — I was wrong

The correctness, in this case, is not necessarily violated, but this largely depends on the “version” and configuration of EPaxos. According to the paper, a thrifty mode, where the coordinator only contacts the required number of nodes to reach quorum, will work as described in the figure, so there are no safety issues. However, without the thrifty mode, the safety breaks. It breaks because we are running a smaller fast quorum (optimized EPaxos) but do not record enough information to solve the ambiguity over taken fast-path that may arise with failures. The non-thrifty, optimized EPaxos is the configuration when we need followers to agree with the dependencies of the coordinator and record this. In the discussion, we looked at some examples of safety violations due to this problem.

Interestingly enough, I am not sure that the authors of the revised paper fully understood the implications of thrifty/non-thrifty and other optimizations on safety. In the EPaxos overview figure, the paper explicitly states: “for simplicity, we only show messages to the necessary quorum,” suggesting that the authors do not assume thrifty mode here, making example-3 incorrect. 

So, where did this example come from? The authors borrow a figure from the original EPaxos presentation. In that presentation, Iulian Morau explicitly stated that the coordinator talks to the majority of replicas only (3 out of 5).

2) TOQ Benefits. In the light of the above, TOQ benefits become a bit more questionable because we are not sure whether the experiments ran on the correct and safe configuration of EPaxos. At the same time, it is worth mentioning that the Tempo paper, which we will discuss later this year, uses a similar approach to TOQ. We also discussed the Accord protocol with a similar technique, so the TOQ is not entirely without benefits and seems appreciated in other literature. 

3) Bounding Dependency Chains. This is an important improvement to EPaxos introduced in this paper. It prevents dependency chains from growing indefinitely under certain conditions. Such uncontrolled growth can prevent execution, stalling the protocol. We have spent quite a bit of time trying to figure out exactly how the proposed solution works. We think the paper could have improved here a bit more on the clarity.

4) Appendix. The appendix has lots of additional performance data that is worth checking. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!