Tag Archives: scalability

Reading Group Special Session: Scalability and Fault Tolerance in YDB

YDB is an open-source Distributed SQL Database. YDB is used as an OLTP Database for mission-critical user-facing applications. It provides strong consistency and serializable transaction isolation for the end user. One of the main characteristics of YDB is scalability to very large clusters together with multitenancy, i.e. ability to provide an isolated user environment for users in a single cluster. In this talk, we will cover two layers of YDB – Tablet and BlobStorage layers that together provide fault tolerance, scalability, and user isolation.

Tablet is a very lightweight component that implements distributed consensus. It is a building block for user data storage, schema manager, tablet manager, system views, and many other components. A single YDB cluster is known to have more than a million tablets, a single YDB node can serve up to 10K tablets. Tablet implements distributed consensus protocol over a shared log provided by the BlobStorage layer.

The BlobStorage layer is a fault-tolerant key-value append-only database. It implements a specific API for logging, blobs reading and writing, garbage collection, and quorum management. We believe that the protocol between Tablet and BlobStorage is quite simple and understandable by listeners compared to Paxos or Raft. We also illustrate the flexibility of YDB in real-life use cases like migrating some parts of the cluster from one availability zone to another without a maintenance window or storing some data blobs on fast media like SSD and another blob on HDD.

Speaker: Andrey is the head of the YDB project (https://ydb.tech). He works in Yandex as a Head of Department. He also has a long story in Yandex as a member of the Web Crawler Team, a member of the Infrastructure team, one of the creators of Yandex Cloud, and now the part of CloudIL Team. Andrey holds a Ph.D. from Moscow State University.

When: Wednesday, August 8th at 2:00 pm EST

Where: Zoom

Reading Group. Darwin: Scale-In Stream Processing

In the 99th reading group meeting, we discussed stream processing. The paper we read, “Darwin: Scale-In Stream Processing” by Lawrence Benson and Tilmann Rabl, argues that many stream processing systems are relatively inefficient in utilizing the hardware. These inefficiencies stem from the need to ingest large volumes of data to the requirement of durably storing ingested data for fault tolerance to the generality of the stream-processing frameworks themselves. Due to these inefficiencies, many stream processing systems scale by adding more machines instead of solving the existing bottlenecks. The authors propose a set of methods to address the scalability concerns. The paper refers to these fixes as a “scale-in” approach, where the system tries to optimize and fully use the resource it already has before requiring more servers. 

Overall, the paper identifies several areas of concern and improvement. On the framework generality side of things, the authors discuss query compilation. The idea here is that compiling streaming queries into native, optimized code will be faster and more efficient than having these queries run in a more general but interpreted environment. Out of all identified problems, Darwin seems to only solve this one. Darwin exposes an SQL-like interface to users, creates a query plan that incorporates various optimizations, translates the query plan into C++ code, and finally compiles C++. 

Other bottlenecks/issues the paper identifies are network communication, persistent storage, and recovery. For network communication bottlenecks, the authors propose using… well faster networks and RDMA. On the persistent storage side of things, the paper claims we need more speed and should consider newer technologies, like nonvolatile memory (PMem). Finally, for the recovery aspect, the paper is again not very creative and suggests using “modern storage technology to achieve efficient checkpointing.”


1) Limited Implementation & Eval. This is a short paper, so there are only so many details on many of the discussed topics. The paper identifies 4 bottlenecks and provides experimental justifications for these. However, on 3 out of the 4 issues, the solution seems to boil down to “using modern technology.” So, it seems like either 3/4 of the problems have been solved with “modern technology” or that the authors have no good solution to them. 

The limited evaluation is also interesting. After all, in RAM, Darwin shows the same performance as the other systems the authors compare against. This makes us question whether the remaining 1 problem and solution (query optimization) is also actually a problem in (at least some) other streaming systems.

2) Few Interesting Numbers. We really liked the motivation for scaling streaming systems the paper uses — Alibaba’s 2020 Singles Day (a big e-commerce sales event) produces 4 billion streamed events per second are required 1.5 million cores of Apache Flink to handle. At such a scale, improving the efficiency of streaming systems is very crucial. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Shard Manager: A Generic Shard Management Framework for Geo-distributed Applications

The 97th paper in the reading group was “Shard Manager: A Generic Shard Management Framework for Geo-distributed Applications.” This paper from Facebook talks about a sharding framework used in many of Facebook’s internal systems and applications.

Sharding is a standard way to provide horizontal scalability — systems can break down their data into (semi-) independent chunks and store and process operations on these data chunks on different servers. There are many complex questions about this seemingly simple concept. For instance, how do we split the data or keyspace into good chunks? And once we know how to partition the keyspace, where do we store the shards and perform compute tasks on them? How do applications find the right partitions for their data?

Despite the widespread use, many large systems handle sharding in an ad-hoc manner, implementing everything they need from scratch. Obviously, reinventing the bicycle each time can be problematic when you run a big company with hundreds of distributed sharded systems and applications. Shard Manager is Facebook’s solution to the sharding problem that aims to be general enough to be reused by many internal systems. It is a comprehensive framework that can decide how to split the data/keyspace (apps can override this and have their own sharding rules), where to place the resulting shards (app specify rules or constraints to guide the placement), and how to find them.

Shard Manager aims to be general and accommodate many workloads and deployment patterns observed at Facebook. The paper states that currently, 54% of all sharded applications at Facebook use the framework. Unlike other sharding frameworks that are constrained to a single data center or region, Shard Manager works across regional boundaries. In a traditional sharding framework, if an application needs a copy of a partition in a different region, it must orchestrate this copy’s placement in the regional instance of the sharding platform. This is a problem for availability, scalability, fault-tolerance, and cost, as components managing the shard placement and migration, are separated between different systems. With Shard Manager, the shard’s placement is not confined to a single region, and copies can move between the geographical regions, all controlled by one system. Speaking of shard copies, Shard Manager assumes replicated shards and also allows for a “special” primary shard to facilitate leader-based replication approaches.

Of course, all shard placements and movements are constrained based on the application requirements and load balancing needs. The applications set the constraints, some of which are hard and cannot be violated, and some are more like a wish list — good to have but not strictly necessary. The hard constraints ensure the minimum requirement for shard placement, such as the server capacity needed for a shard. Examples of soft constraints or goals are preferred geographical location, the spread of replicas across failure domains, and various load balancing requirements.

To make everything tick, Shard Manager consists of multiple components. The orchestrator component is a monitor for health and resource usage; it makes the decisions to change the placement of shards whenever needed. The allocator component creates new shard-to-server assignments. Allocator also receives input from Twine, Facebook’s cluster manager. This connection allows the orchestrator to make decisions based on planned or anticipated events known to Twine, such as servers shutting down or rebooting for maintenance. Apparently, this is a big deal, as it allows applications to gracefully handle infrastructure maintenance. The paper spends quite some time talking about graceful shard migration, especially for cases when a primary copy of a shard needs to move to a different server.

Finally, the ZooKeeper stores all the important stuff, such as the orchestrator’s state and shard assignments. ZooKeeper also acts as a failure detector for application nodes. On the application side, servers used Shard Manager (SM) Library to interact with the orchestrator and ZooKeeper. Application clients, on the other hand, interact with Service Discovery to find the required shards. We recently covered the Delos paper that talks about building control plane storage for Facebook to replace ZooKeeper, so it is likely that the actual ZooKeeper has been replaced with a Delos-based system.

The scale of Facebook’s operation demands the Shard Manager to be sharded itself. The system is composed of many Mini Shard Managers (Mini-SMs). Each Mini-SM handles multiple partitions, and each partition represents a slice of servers available to some application across many regions.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. ByShard: Sharding in a Byzantine Environment

Our 93rd paper in the reading group was “ByShard: Sharding in a Byzantine Environment” by Jelle Hellings, Mohammad Sadoghi. This VLDB’21 paper talks about sharded byzantine systems and proposes an approach that can implement 18 different multi-shard transaction algorithms. More specifically, the paper discusses two-phase commit (2PC) and two-phase locking (2PL) in a byzantine environment.

As usual, we had a presentation of the paper. Karolis Petrauskas did an excellent job explaining this work: 

The paper states that modern blockchain technology relies on full replication, making the systems slower and harder to scale. 

Sharding is a natural way to solve the problem and has been done countless times in the crash fault tolerance setting. Of course, a sharded system often needs to perform transactions that touch data in more than one shard. The usual way to solve this in CFT is to use some version of 2PC coupled with some concurrency control mechanism, like 2PL. ByShard follows this general recipe, but in the BFT setting, which complicates things a bit. The problem is making 2PC and 2PL work in a byzantine, adversarial environment without tightly coupling all shards back together into one “megashard.” So, we need a way to communicate between shards in a reliable way. 

Let’s look at a transaction lifecycle. When we have a transaction that spans multiple shards, the first step is to deliver this transaction to each shard and check whether a shard can run it. Then, if everything is ok, we need to run the transaction and provide some isolation from other ongoing transactions. ByShard implements all these actions with shard-steps. Each shard-step is a building block of all ByShard protocols and allows the shard to inspect a transaction, make changes to the local state, and send the message to start another shard-step on another shard. Overall, ByShard uses three distinct types of shard-steps: vote-step, commit-step, and abort-step. 

The message sending part is kind of important, as we need this communication to be reliable in the BFT setting. The paper gracefully ignores this problem and points to a few solutions in the literature. In short, ByShard requires a cluster-sending protocol that ensures reliable communication between shards, such that, all correct nodes of the receiver shard get the message, all the correct nodes of the sender shard get an acknowledgment, and that sending requires the sender shard to reach an agreement on what to send. The last point ensures that bad actors do not send malicious stuff, and I suspect on the receiver there needs to be a way to check that the received messages were indeed certified by the sender’s consensus. 

Vote-step is used to replicate the transaction between shards. When a shard receives the transaction, it starts the vote-step and checks whether it can proceed with the transaction. The shard may also perform local state changes if needed. At the end of the vote-step, a shard forwards some information to another shard to start a new shard-step. Since we only have three building blocks, the stuff vote-step sends can start another vote-step, commit-step, or abort-step at the receiving end. The purpose of commit-step and abort-step is self-evident from their name. One important thing to note on abort-step is that it needs to undo any local changes that a prior vote-step might have done to ensure that the aborted transaction leaves no side effects. 

Now we can look at how ByShard composes these three basics steps. The figure above provides a visual illustration of three different ways ByShard runs 2PC. One aspect of the figure that I fail to understand is why not all shards run vote-step and commit-step, and the text does not really provide an explanation.

In the linear orchestration, the transaction progresses from the coordinator one shard at a time. If any shard decides to abort, it needs to start the abort-step and notify all other shards involved in the transaction (or at least all other shards that voted earlier). If a shard decides to commit, it actually starts a vote-step in the next shard. If the vote-step successfully reaches and passes the last shard involved in the transaction, then that last shard can broadcast the commit-step to everyone. Centralized orchestration looks more like the traditional 2PC, and distributed orchestration cuts down on the number of sequential steps even further. The three strategies represent tradeoffs between the latency and number of communication exchanges and shard-steps. 

So with 2PC taken care of, we can briefly discuss the concurrency control. ByShard proposes a few different ways to implement it, starting with no concurrency control, thus allowing observation of partial results. Because of the side effect cleaning ability of abort-step, if some transaction partly executes and then reaches the abort-step, then its execution will be undone or rolled back. This reminds me of the sagas pattern. The other solution is to use locks to control isolation. The paper (or the presentation above) has more details on the nuances of locking and requiring different locks with a different type of orchestration. By combining different ways to orchestrate the transactions with different ways to execute them, ByShard presents 18 BFT transactional protocols with different isolation and consistency properties. 


1) Comparison with Basil. An obvious discussion is a comparison with Basil, another transactional sharded BFT system. Basil is a lot closer to the Meerkat solution from the CFT world, while ByShard is a more classical 2PC+2PL approach. In Basil, the degree of fault of tolerance is smaller (i.e, it needs 5f+1 clusters). At the same time, ByShard is a lot underspecified compared to Basil. ByShard relies on existing BFT consensus and BFT cluster-broadcast mechanisms to work, while Basil provides a complete and contained solution. On the performance side of things, ByShard requires a lot of steps and a lot of consensus operations across all the involved shards to do all of the shard-steps. This must have a significant performance toll. While it is not correct to compare numbers straight between papers, Basil can run thousands of transactions per second, while ByShard’s throughput is in single digits. However, it is worth mentioning that ByShard’s experiments included more shards; ByShard’s transactions also involved large number of shards. 

2) (Distributed) Sagas Pattern. As I briefly mentioned in the summary above, ByShard, especially with linear orchestration and no isolation reminds me of Sagas patterns. Distributed sagas are used to orchestrate long-running requests in microservice-type applications. If we squint our eyes, we can see each shard as a separate microservice. As vote-steps propagate across shards, they perform local changes. And if an abort is needed, the abort causes these changes to be rolled back. However, when we add isolation to ByShard, the similarities with sagas start to disappear. 

3) Performance. An important motivation for sharding is performance, however, it does not seem like ByShard achieves stellar performance here. Of course, sharding is still useful for systems that operate with large amounts of data that otherwise would not fit into a single machine. Nevertheless, without strong performance, a solution like this has very few advantages over not using sharded/partitioned systems at all. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories

Hard to imagine, but the reading group just completed the 45th session. We discussed “Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories,” again from OSDI’20. Pegasus is one of these systems that are very obvious in the hindsight. However, this “obviousness” is deceptive — Dan Ports, one of the authors behind the paper who joined the discussion, mentioned that the project started in 2017, so it was quite a bit of time from the start to publish with a lot of things considered and tried before zeroing in on what is in the paper. 

Pegasus is a replication system for load balancing and throughput scalability in heavily skewed workloads. Consider a workload with a handful of “hot” objects. These hot objects may have so much usage, that they overwhelm their respective servers. Naturally, this limits the overall throughput because the system is now capped by servers at their maximum capacity/utilization. The solution is to replicate these hot objects to many servers and allow clients to access them from multiple replicas. However, as soon as we have a replicated scenario, we start running into consistency issues. Strongly consistent systems often degrade in performance with the addition of more replicas due to the synchronization overheads. This is what makes Pegasus rather unique — it scales for load balancing through replication while remaining strongly consistent. The key enabler of this is the smart Top of Rack (ToR) switch that handles all the traffic in the server rack. This switch acts as the “source of synchrony” in the rack, and it does so at the packet’s line speed. 

In Pegasus, the data is assigned to servers in the rack using a consistent hashing mechanism, allowing clients to send the requests directly to servers that own the data. However, all these requests go through the ToR switch which can inspect the packets and make some additional routing decisions for “hot” objects. Let’s consider a write request for a such high-demand object. ToR inspects a packet, and if it is for a “hot” key, it sends the write message to some larger and potentially different set of servers in a rack, essentially increasing the replication factor and rotating the responsible servers. Once the servers ack the write completion, the ToR switch sees the acks and records these servers into its coherency directory as servers with the latest copy of the data. The read requests have a similar rerouting fate — if a read is for a hot object, instead of going to the default server, the ToR switch sends it to one of the replicas from its coherency directory. The paper has more details on implementing this coherency directory and keeping track of the recent progress using a simple versioning mechanism.

The end result is awesome! Just by replicating a handful of objects in skewed workloads (~16 objects out of a million in the paper), Pegasus achieves load balancing and high throughput beating in-network caching in almost all scenarios. There are a few other advantages to Pegasus that are missing in other SOTA solutions: the ability to store larger objects (not evaluated), and tolerance of workloads with different read-write ratios (evaluated extensively).

Finally, I have not touched on a few other important pieces of the system: figuring out which keys are hot and fault-tolerance. For measuring the key temperature, the Pegasus statistics engine samples some packets and determines the frequency of keys in the samples to make gauge how hot each key is. For fault-tolerance, the system uses chain replication across racks for durability.

As always, we have our presentation of the paper by A. Jesse Jiryu Davis:


This time around we had Dan Ports join us to answer the questions about the paper, so this turned out to be a nice discussion despite a slightly lower than expected attendance. 

1) Simple API. Currently, Pegasus supports a simple API with reads and simple destructive writes (i.e. each write is an unconditional overwrite of the previous copy of the data). The main reason for this is how the writes are structured in Pegasus. The system is very nimble and quickly adjustable, it picks write servers on the fly as the write request “goes through” the switch. This means that a write should be able to just complete on the new server. However, if the write operation is, for example, a conditional write or an update, then such an update also needs to have the previous version of the object, which may be missing on the new server. We have spent some time discussing workarounds for this, and they surely seem possible. But the solution also introduces additional communication, which both introduces more load to the servers and more latency for operations. And since we are dealing with a subset of objects that already generate the most load in the system, adding anything more to it must be avoided as much as possible. The cost of supporting these more complex API will also differ for various read-write ratios.

2) Comparison with caching. Another big discussion was around using caching to load balance the system. As Dan pointed out, caches are good when they are faster than storage, but for super-fast in-memory storage, it is hard to make a cache faster. NetCache (one of the systems used for comparison in the paper) does provide a faster cache by placing it in the network switch. It has several downsides: handles only small objects, consumes significant switch resources, and does not work well for write workloads (this is a read-through cache, I think). Of course, it is possible to make it a write-through cache as well to optimize for write workloads, but it still does not solve other deficiencies and adds even more complexity to the system. We also touched on the more complicated fault-tolerance of cached systems. The disparity between the load that cache and underlying systems can take can create situations when the underlying systems get overrun upon cache failure or excessive cache misses. 

3) Chain replication. Since Pegasus replicates for scalability, it needs a separate mechanism to handle fault-tolerance. The paper suggests using a chain replication approach, where racks are the chain nodes. Only the tail rack serves reads, however, the writes must be applied in all racks as the write operation propagates through the chain. One question we had is why not use something like CRAQ to allow other racks to serve reads, but the reality is that this is simply not needed. The chances that an object can become so skewed and hot that it needs more than a rack worth of servers are very slim, so there is no need to complicate the protocol. Another question I have now but forgot to ask during the discussion is what happens to hot writes as they go through the chain? If non-tail racks only use the default server for writes on “hot” keys (instead of let’s say round-robin or random node), then this server may get overwhelmed. But I think it is trivial to pick a random server for the hot object on each write at the non-tail racks. 

4) Zipfian distribution and workload skewness. Pegasus needs to load-balance fewer keys for a less skewed Zipfian distribution. This was an interesting and a bit counter-intuitive observation at the first glance since one can intuitively expect the more skewed distribution to require more load-balancing. However, a higher alpha Zipf has more skewed objects, but not necessarily more skewed objects than a lower alpha Zipfian distribution. Fewer highly skewed objects mean less load-balancing. 

5) Virtualization of top-of-rack switches. One important question about the technology that enables Pegasus is the virtualization of these smart ToR switches. Currently, it is not the case, so one needs to have bare-metal access to a switch to deploy the code. Virtualization of such a switch may make the technology available to cloud users. I think this would be a huge boost to the overall state of distributed computing at the datacenter level. I would be speculating here, but I think a lot depends on the willingness of manufacturers to provide such support, especially given the relatively limited capabilities of the hardware right now. Of course, the virtualization should not add a significant latency penalty to the users, and most importantly should not add any penalty to non-users (applications/systems that reside in the same rack but do not use the extended capabilities of the switches). Couple all these with the risks of running user’s code on the hardware that handles all the traffic in the rack, and we also need to worry about user isolation/security more than ever. However, as wishful as it is, it is quite probable that these smart switches will not make their way to the public cloud any time soon. This gives large cloud vendors an edge since they can benefit from the technology in their internal systems. Smaller service providers that rely on the cloud, however, will have to find a way to compete without access to this state-of-the-art technology.  Aside from my extremely high-level speculations, some smart people actually go deeper into the topic.

6) There were a few other minor topics discussed, and jokes are thrown here and there. For example, Dan explains Pegasus with cat pictures

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Autoscaling Tiered Cloud Storage in Anna.

This week we looked at “Autoscaling Tiered Cloud Storage in Anna.” This is the second Anna paper. The first one introduces Anna Key-Value store, and the second paper talks about various “cloud-native” improvements. The presentation by Michael Whittaker is available here:

Short Summary

The Anna Architecture

Anna is an eventual-consistent key-value data store, where each value is a lattice. Anna can tweak some consistency guarantees by changing the type of lattice stored in the value. For example, it can be a Last Write Wins (LWW) system that merges operations based on the definition of the last writer (i.e. a timestamp), or it can use some CRDT lattice to preserve all updates/operations. The weak consistency of Anna means that it is relatively easy to add and remove nodes to scale the systems. The Autoscaling part takes advantage of that and dynamically adjusts the number of replicas responsible for each key to match the workload. Moreover, Autoscaled Anna has multiple usage tiers for keys based on the usage frequency. The paper discusses “hot” and “cold” keys, but more tiers are possible. Such tiering allows using heterogeneous hardware to better balance the cost and performance of Anna, as “cold” keys may run on cheaper hardware backed by SSDs, while “hot” keys may be store in RAM on more expensive VMs. Anna has rules for data transitioning between tiers and for controlling the degree of replication. For example, there is a minimum replication factor for durability, and there are usage thresholds for promoting and demoting keys to/from hot/cold tiers.


We have spent quite some time talking about this paper, as there is a lot of information to digest. Below are a few of the bigger discussion points

1) Apache Cassandra comparison. This came up a few times as lacking, as the system is somewhat similar to Cassandra, at least when you consider the LWW lattice. There are many differences too, especially in the context of autoscaling. Cassandra uses a statically configured replication factor and lacks any autoscaling. Cassandra’s DHT model is pretty rigid and resists scaling – adding nodes require considerable ring rebalancing. Also, only one node can be added/removed from the ring at once to avoid membership and ring partitioning issues. Anna has no such problem and can adjust quicker and more dynamic. Additionally, for all the fairness, the original Anna paper compares against Cassandra.

2) Breaking Anna. Part of the discussion was around breaking the autoscaling and putting Anna in a bad/less-performant state. One example would be using a workload that changes at about the same rate as Anna can adjust. For example, if we hit some keys above the threshold to promote from the “cold” tier to “hot”, we can cause a lot of data movement as these keys potentially migrate or added to the memory-tier hardware. At the same time, we can reduce the usage, causing the systems to demote these keys, and experience associated overheads. This is like putting a workload in resonance with Anna’s elastic abilities, and it reminds me of a famous Tacoma Narrows bridge collapse

3) Possible consistency guarantees. One of the benefits of Anna is that some guarantees can be tweaked based on the lattice used for the value. Part of the discussion focused on how much consistency we can get purely out of these lattices, and how much may necessitate more drastic architectural changes. It is pretty straightforward to understand that Anna won’t ever provide strong consistency, but what about some other guarantees? For example, the original Anna paper claims the ability to have causal consistency, although it uses more complicated lattices, and keeps vector clocks for maintaining causality, not only making the system more complicated but also potentially harming the performance (something noted by the authors themselves).

4) Comparison fairness. Some points were mentioned about the evaluation fairness. It appears that some systems compared against are significantly more capable and/or may provide better guarantees(DynamoDB, Redis). It is worth noting, however, that there are not that many, if any, direct competitors, so these comparisons are still very valuable.

5) Cloud-Native. This appears as one the “most” cloud-native database papers that talks about some practical aspects of designing and running a  cloud-native database. One aspect of this “cloud-nativeness” is the focus on elasticity and scalability happening automatically. Another one is the cost-performance consideration in how the system scales with regard to workload changes. We noted some other much older papers that fit our definition of “cloud-native” papers. One example was Yahoo’s PNUTS paper that describes a global system with some capability to adjust to workloads and selective replication.

6) Scheduling/resource allocation. One of the challenging aspects of  Anna with autoscaling is figuring out the best key “packing” or binning” given multiple tiers and pricing constraints. This is similar to a Knapsack problem and bin packing problem, and both are NP-complete. So this resource management/scheduling will become a big problem at the production scale when we may have thousands of machines. At the same time, task packing problems at the datacenter level are well studied, for example in Google’s Borg cluster management system or its derivative Kubernetes


Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

PigPaxos: continue devouring communication bottlenecks in distributed consensus.


This is a short follow-up to Murat’s PigPaxos post. I strongly recommend reading it first as it provides full context for what is to follow. And yes, it also includes the explanation of what pigs have to do with Paxos.

Short Recap of PigPaxos.

In our recent SIGMOD paper we looked at the bottleneck of consensus-based replication protocols. One of the more obvious observations was that in protocols relying on a single “strong” leader, that leader is overwhelmed with managing all the communication. The goal of PigPaxos is to give the leader a bit more breathing room to do the job of leader, and not talking as much. To that order, we replaced the direct communication pattern between leader and followers with a two-hop pattern in which a leader talks to a small subset of randomly picked relay nodes, and the relays in turn communicate with the rest of the cluster. PigPaxos also uses relays to aggregate the replies together before returning to the leader. On each communication step, PigPaxos uses a new set of randomly picked relay nodes to both spread the load evenly among the followers and to tolerate failures.


By randomly rotating the relays and enforcing timeouts and including some other optimization on how many nodes to wait at each relay node, we can provide adequate performance even in the event of node crashes or network partitions. The fault tolerance limit of PigPaoxs is similar to Paxos, and up to a minority of nodes may fail with the system still making some (limited if implemented naively) progress.

Some More Results

In the original PigPaxos post, we have not talked about scaling to super large clusters. Well, I still do not have that data available, but following the footsteps of our SIGMOD work, we have developed a performance model that, hopefully, is accurate enough to show some expected performance on the bigger scale.

Performance Model of PigPaxos on for a cluster of 25 and 99 nodes and 3 relay groups.

Network uniformity is not a requirement for PigPaxos. In fact it is perfectly ok to have some links slower than the others. However, some arrangement of relay groups may be required to get the best performance when links between nodes have different speed or capacity.  The most pronounced real-world example of this non-uniformity is the wide area networks. When we deployed real, not-simulated PigPaxos in such geo-distributed environment, it no longer had the disadvantage of slower latency, as the latency became dominated by much slower geo-links. We took advantage of natural division between fast and slow links, and made all nodes in every region to be part of the same relay group. Another advantage of this setup is the amount of cross-region traffic flowing, as data moves to each region only once regardless of how many replicas are there.


On the fault tolerance front, relay nodes definitely introduce more ways for the protocol to stumble. Crash of a relay node makes the entire relay group unavailable for that communication attempt. Crash of a non-relay node causes timeout which may add to the operation latency. The core principle behind PigPaxos’ fault tolerance is to repeat failed communication in the new configuration of relay nodes. Eventually, the configuration will be favorable enough to make progress, given that the majority of nodes are up. However, this process can be slow when many nodes are crashed, so some orthogonal optimization can help. For example, it is worth remembering nodes temporarily down and not use these nodes for relays or otherwise expect them to reply on time. Another approach is to reduce the wait quorum of the relay group to tolerate strugglers, or even use overlapping groups for communication redundancy. However, even with all these ad-hoc optimizations turned off, PigPaxos can still mask failures originating in the minority of relay groups without much impact on performance. For example, in the experiment below we have one relay group experiencing a failure on every operation for 10 seconds without much detriment to overall performance.


Why Scaling to This Many Nodes?

One of the most important questions about PigPaxos is “why?” Why do you need this many nodes in Paxos? Well, the answer is not simple and consists of multiple parts:

  •         Because we can!
  •         Because now we can tolerate more nodes crashing
  •         Because now we can make services like ZooKeeper or even databases to scale for reads just by adding more nodes. ZooKeeper reads are from a single node. And so are many databases that provide some relaxed consistency guarantees.
  •         Because it allows bigger apps with more parties that require consensus. And it is done by a single protocol.



One Page Summary. Aegean: Replication beyond the client-server model

One Page Summary.  Aegean: Replication beyond the client-server model

Nested Services
Nested microservices. One service may act as a client for another.

This paper builds o

n a key observation about the operation of complex distributed applications. Namely, microservice style of application rarely follows a simple client-server architecture, where a client makes a request and the server (or servers) respond to a request. Instead, many applications often use a nested approach, where clients communicate with some service, and the service itself acts as a client for one or more other nested services. This nesting often presents some challenges with traditional replication protocols, like primary-backup or Paxos-based RSM replication. For instance, when a service is replicated for durability, it makes it more difficult to preserve correctness of nested requests: in case of service failure, the information on whether the nested request was issued or returned may have been lost without some additional safeguards, making it difficult to track whether nested requests need to be reissued or otherwise recovered. Authors also claim that the existing approaches, like Paxos, suffer from performance penalty when dealing with nested calls, since the replicated service needs to block and wait for nested calls to resolve. I personally do not buy the latter issue too much, as many existing replication solutions, even Paxos-based, try to take advantage or parallelism whenever possible, by either using a pipelined approach or concurrently operating on independent requests or data in different conflict domains.

Aegean Shim
A shim layer in front of service B that collects a majority of (duplicate) requests coming from A before passing these requests to B.

To counter the problems with nested request calls and responses, Aegean proposes to use a shim layer sitting next to each replicated service. When one service creates a nested request to the other service, it will talk to the shim layer instead of the nested service directly. The shim layer runs at each replica of a replicated service and collects the requests coming from the caller service (it assumes each replica of the caller will send a request). The shim passes the request to the nested service replica only upon collecting the majority requests from the caller, ensuring that the caller has sufficiently replicated the nested request. The replica can then process/replicate requests. Similarly, when the nested service generates the response, the shim layer broadcasts the response to all replicas of the caller service, keeping track of caller replicas receiving the responses and resending them as needed. Additionally, to ensure the response durability, every replica of the caller service sends the ack to other replicas, and only acts on the responses from a nested call when it itself receives a majority of such acks (including its own). This ensures that the responses to a nested call have been logged by at least a majority of replicas in a service. All these shim layers and ensuring response durability create a lot more message exchange in the system, which undoubtedly will impact the performance.

Another aspect of the paper deals with speculative execution of some requests, as these also introduce problems in the context of nested microservices, as speculative state may leak and get exposed to other services in the nesting chain. Aegean solves the problem of speculation by using barriers before the speculative state may become visible and resolves the speculation by reset and reply if replicas arrived at a different state.

Aegean Performance Evaluation
Aegean Performance Evaluation

To solve the performance issues with sequential Paxos, Aegean proposes to use pipelined approach, which is definitely not new. For example, our Paxi from a few years back is a pipelined implementation of many consensus protocols. Authors claim that Aegean has decent performance, although I find the evaluation a bit lacking. The main comparison is against sequential (not pipelined) Paxos, and Aegean is doing well in this setting. However, even authors admit that a large portion of the difference is due to the pipelining, raising the question of whether the performance comparison is fair in the first place.

Overall, I enjoyed the problems caused by replication in nested microservice architecture, but I am not sure I am too excited about the solution. The solution is a solid one for sure, but it appears very piecewise, with every piece specifically targeting a sub-problem, so it lacks certain elegance (which is not a bad thing at all for a solid practical approach to a problem). The evaluation is one part that raises the most questions for me, ranging from claims that non-byzantine tolerant Paxos and PBFT have similar throughput, to picking inherently weak baselines for evaluation, like non-pipelined Paxos.