All posts by alekseyc

Winter Term Reading Group Papers: ##121-130

Our winter set of papers! The schedule is also in our Google Calendar.

Fall Term Reading Group Papers: ##111-120

Below is a list of papers for the fall term of the distributed systems reading group. The list is also on the reading group’s Google Calendar.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Special Session: Scalability and Fault Tolerance in YDB

YDB is an open-source Distributed SQL Database. YDB is used as an OLTP Database for mission-critical user-facing applications. It provides strong consistency and serializable transaction isolation for the end user. One of the main characteristics of YDB is scalability to very large clusters together with multitenancy, i.e. ability to provide an isolated user environment for users in a single cluster. In this talk, we will cover two layers of YDB – Tablet and BlobStorage layers that together provide fault tolerance, scalability, and user isolation.

Tablet is a very lightweight component that implements distributed consensus. It is a building block for user data storage, schema manager, tablet manager, system views, and many other components. A single YDB cluster is known to have more than a million tablets, a single YDB node can serve up to 10K tablets. Tablet implements distributed consensus protocol over a shared log provided by the BlobStorage layer.

The BlobStorage layer is a fault-tolerant key-value append-only database. It implements a specific API for logging, blobs reading and writing, garbage collection, and quorum management. We believe that the protocol between Tablet and BlobStorage is quite simple and understandable by listeners compared to Paxos or Raft. We also illustrate the flexibility of YDB in real-life use cases like migrating some parts of the cluster from one availability zone to another without a maintenance window or storing some data blobs on fast media like SSD and another blob on HDD.

Speaker: Andrey is the head of the YDB project (https://ydb.tech). He works in Yandex as a Head of Department. He also has a long story in Yandex as a member of the Web Crawler Team, a member of the Infrastructure team, one of the creators of Yandex Cloud, and now the part of CloudIL Team. Andrey holds a Ph.D. from Moscow State University.

When: Wednesday, August 10th at 2:00 pm EST (6 pm UTC)

Where: Zoom

Metastable Failures in the Wild

Metastable failures in distributed systems are failures that “feed” and strengthen their own “failed” condition. The main characteristic of a metastable failure is a positive feedback loop that keeps the system in a degraded/failed state. These failures are hard to spot, as they always start with some other distraction — some trigger event that nudges the system over its operating limit or capacity. However, fixing the trigger is not enough, and engineers realize this too late — so late that sometimes it takes days to stabilize the system and return to the pre-failure level of operation.

I already blogged about this topic last year in the context of our original metastable failures paper from HotOS’21. Now we have the “improved and extended” version out at OSDI’22, and oh boy, we have a lot of juicy stuff there. That failure that took days to recover? I did not make it up. My students spent quite some time going through hundreds, if not thousands, publicly available post-mortem reports for different systems to identify a handful of impactful metastable failures. We are sure there are more metastable failures out there lurking in these bug and failure reports. In fact, we had significantly more candidate failures, but many post-mortems simply do not have enough information to be absolutely sure metastability played a role.

Anyway, above is a table from the paper that lists examples of metastable failures in the wild. Some last for an hour, and some take days to resolve. Many examples are due to retries and retry-induced load amplification. See, when a system becomes slow (i.e., some server failure or transient network issue), it is likely that latency on some requests will start to grow, and it may reach the point at which the client application gives up the request and retries. Retries are great for dealing with transient issues, but they create more load for the system by potentially repeating the same request over again. When the system is already overloaded due to the ongoing issue, retries do not solve any real problem and create more unnecessary load. More load means higher queuing times, higher latency, and more retries. This vicious cycle can continue even after the original trigger issues are fixed, and, in many cases, there is no escape from this except load shedding.

We managed to reproduce a toy example of retry-induced failure using MongoDB. It is a bit concerning that we were able to do so at a small scale, but our experience also showed that not all hope is lost. See, as it turns out, modern systems are good at load shedding — when the request queues up for too long and is too old by the time server can handle it, such an old request can be dropped without wasting further resources. Our experiment here used a very aggressive retry policy that had to overrun the load-shedding mechanisms of the underlying database. That being said, sometimes the difference between a failure and recovery can be down to minuscule difference in the trigger. The figure here shows a failure when a trigger took out 80% of CPU capacity, and a successful trigger recovery when the trigger reduced CPU capacity by 78%.

Now, of course, retries are not the only cause of the self-feeding failure loops, but retries are something most engineers are familiar with, so the post-mortem reports tend to have more details on that topic. Potentially, this is one reason for observing so many retry-induced metastable failures in the wild. However, we saw other feedback loops. For example, one of the failures in the list was caused by excessive logging on the error path, which in turn caused the system to slow down and have more requests enter that same error path.

Seeing the examples in the real world and reproducing them is important. Our HotOS paper claimed that reproducibility at a small scale is often challenging. In this new paper, we have a handful of metastable failure scenarios reproduced at a scale of at most a few VMs. However, a good chunk of the paper is about generalizing what we have learned from the observations. We developed a “reasoning framework” for metastability and metastable failures.

The two important characteristics of metastable failures are triggers and amplification effects. On the trigger side, we can have a load-spiking trigger and capacity-decreasing triggers. The former trigger type temporarily increases the load on the system above some pre-trigger level. An example of this can be higher than expected demand from users. The later trigger type degrades the systems’ serving capacity. An example of this trigger is a server failure that takes some capacity out.

Similarly to triggers, there are two types of amplification effects: workload amplification and capacity degradation amplification. Both feedback loops kick in after a system enters an overloaded state — having an insufficient serving capacity for the current offered load. The workload amplification seems to be the more common type in the examples we found so far. It creates even more load in response to overloaded. The vicious retry loop from earlier is an example of workload amplification. The other type, capacity degrading amplification, reduces the serving capacity of the system in response to an overload. This can happen, for instance, in cached systems and applications, when losing some cache can result in downstream system overload that either prevents cache rebuild or downright causes more cache to expire and go stale.

The figure above shows how these two types of triggers and two types of amplification mechanisms can combine. Also, an important point to note — it is possible to have a combination of more than one trigger and more than one amplification mechanism, something that we have seen in the wild. As far as experiencing metastable failure, a lot depends on how the triggers and amplification mechanisms interact and how much load was offered to the system before the failure sequence began.

Small triggers that do not last long or do not have a big enough magnitude (i.e., a trigger that does not overload the system too much) may be less likely to cause an issue. On the other hand, a trigger that lasts an hour and takes out half of the servers is a much bigger problem. Of course, the magnitude of the trigger is not an absolute metric — your system may be fine even when you take out half the servers if it was operating at 25% of its maximum capacity before the trigger event took place.

Triggers are not metastable failures. They are failures on their own that can cause availability and performance issues for clients or users, but they are only the beginning of the metastable failure sequence. The amplification mechanisms are those feedback loops that start after the system is overloaded due to a trigger. Amplification leads to a metastable failure after the system’s overload is amplified so high that even fixing the trigger (i.e., returning lost capacity, or solving the load spike) does not solve the problem. The figure marks those metastable failure points. Naturally, the type of amplification matters a lot. A system with “slow” amplification loops that add little overload over time is better than systems with “fast” amplification loops. In both cases, we can have a metastable failure, but a slow amplification mechanism gives us more time to deal with the trigger.

Anyway, what are the lessons here? For one, I cannot stress the importance of understanding the mechanisms involved so that we can operate the systems better. Maybe, it is not a good idea to squeeze every single bit of capacity from the system/hardware? Having some slack resources allows us to absorb smaller triggers and gives time before the system enters the metastable failure for bigger triggers. The same goes for making systems with predictable performance characteristics. If some code path is significantly slower than the other, a sudden switch to a slower code path can act as a trigger, since it is essentially a capacity degradation.

Load shedding is the solution to metastable failures. All recovery efforts included some load shedding mechanisms. After all, to bring the system back into operation, its load needs to decrease sufficiently as to stop the amplification. But we do not need to leave load shedding only for recovery efforts after it is already too late. More aggressive load shedding done ahead of time or early after the trigger overloaded the system can save from a bigger failure — it is better to lose some requests and piss some clients/users than lose the entire system and impact everyone. Here I can also lump load prioritization — prioritize the work that will raise the system’s goodput if possible.

Another point is to account for the human factor — about half of the observed failures in the wild had some direct human involvement, such as rolling out bad configuration or buggy code. It is important to test and deploy configuration the same way as software. This is something that many systems still seem to neglect. Proper testing of new code is paramount, including stress testing. And, of course, do not deploy on Friday… we have seen a failure in which a weekend load/traffic reduction masked increased resource usage of a new version deployed on Friday. The system failed after the load returned on Monday.

Another interesting observation is the “fix-to-break” behaviors. On a few occasions, engineers increased the severity of the amplification mechanism (feedback loop) after fixing an outage, leading to a more severe failure in the future. I refer to this as a “fix-to-break” pattern; it is a good illustration of why we need to understand metastability in order to avoid causing failures.

As always, the paper has way more details. We talk a lot about definitions and formalism of metastable states and failures. We have a handful of distinct reproductions with triggers and amplifications caused by retries, cache failures, and garbage collections. And finally, we include some real-world first-hand details on metastable failures at a large internet company.

Reading Group. Darwin: Scale-In Stream Processing

In the 99th reading group meeting, we discussed stream processing. The paper we read, “Darwin: Scale-In Stream Processing” by Lawrence Benson and Tilmann Rabl, argues that many stream processing systems are relatively inefficient in utilizing the hardware. These inefficiencies stem from the need to ingest large volumes of data to the requirement of durably storing ingested data for fault tolerance to the generality of the stream-processing frameworks themselves. Due to these inefficiencies, many stream processing systems scale by adding more machines instead of solving the existing bottlenecks. The authors propose a set of methods to address the scalability concerns. The paper refers to these fixes as a “scale-in” approach, where the system tries to optimize and fully use the resource it already has before requiring more servers. 

Overall, the paper identifies several areas of concern and improvement. On the framework generality side of things, the authors discuss query compilation. The idea here is that compiling streaming queries into native, optimized code will be faster and more efficient than having these queries run in a more general but interpreted environment. Out of all identified problems, Darwin seems to only solve this one. Darwin exposes an SQL-like interface to users, creates a query plan that incorporates various optimizations, translates the query plan into C++ code, and finally compiles C++. 

Other bottlenecks/issues the paper identifies are network communication, persistent storage, and recovery. For network communication bottlenecks, the authors propose using… well faster networks and RDMA. On the persistent storage side of things, the paper claims we need more speed and should consider newer technologies, like nonvolatile memory (PMem). Finally, for the recovery aspect, the paper is again not very creative and suggests using “modern storage technology to achieve efficient checkpointing.”

Discussion

1) Limited Implementation & Eval. This is a short paper, so there are only so many details on many of the discussed topics. The paper identifies 4 bottlenecks and provides experimental justifications for these. However, on 3 out of the 4 issues, the solution seems to boil down to “using modern technology.” So, it seems like either 3/4 of the problems have been solved with “modern technology” or that the authors have no good solution to them. 

The limited evaluation is also interesting. After all, in RAM, Darwin shows the same performance as the other systems the authors compare against. This makes us question whether the remaining 1 problem and solution (query optimization) is also actually a problem in (at least some) other streaming systems.

2) Few Interesting Numbers. We really liked the motivation for scaling streaming systems the paper uses — Alibaba’s 2020 Singles Day (a big e-commerce sales event) produces 4 billion streamed events per second are required 1.5 million cores of Apache Flink to handle. At such a scale, improving the efficiency of streaming systems is very crucial. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Achieving High Throughput and Elasticity in a Larger-than-Memory Store

Achieving High Throughput and Elasticity in a Larger-than-Memory Store” paper by Chinmay Kulkarni, Badrish Chandramouli, and Ryan Stutsman discusses elastic, scalable distributed storage. The paper proposes Shadowfax, an extension to the FASTER single-node KV-store. The particular use case targeted by Shadowfax is the ingestion of large volumes of (streaming) data. The system does not appear to have replication and cannot handle server failures without data loss. However, the authors focus on reconfiguration performance and its impact on the rest of the system. So, in case of failures, replacement servers can be added with a minimal performance penalty. Under failure-free conditions, nodes can be added or removed without data loss. Finally, Shadowfax stores data in three different “places” based on how hot the keys are. Recently updated keys reside in RAM, colder data gets pushed to the SSD, and finally, the data from SSD is asynchronously copied to the shared cloud storage.

The main problem Shadowfax solves is ingesting lots of data from networked clients. A single server can only do so much, so we really need a system that can partition the keyspace across the servers. Shadowfax builds on top of FASTER, single-node storage that leverages a lock-free hash table to support key indexing/look-up. Shadowfax adds hash-based key partitioning to ensure each server is responsible for a subset of all keys. This partitioning is stored in ZooKeeper for fault tolerance of the sharding configuration. 

A lot of the Shadowfax design stems from two ideas: (1) avoiding coordination and context switching and (2) avoiding unnecessarily costly operations.

For the first point, Shadowfax servers pin processing threads to cores, allowing each thread to pick up some requests and dispatch them to the underlying FASTER store shared among all processing threads. As such, the processing threads never coordinate with each other by any other means except the FASTER store. The shared FASTER store uses lock-free hashing to minimize the impacts of concurrency. Whenever some coordination is needed, FASTER avoids explicit cross-thread coordination with the help of asynchronous cuts. As different threads may access the same shared data, it is often essential to know when all threads have passed or seen some particular version of the data. For instance, when some record moves from RAM to SSD, all threads must have completed their access to that record in RAM (or agreed not to access it) before the memory can be cleaned up. FASTER achieves this lazily via view or epoch changes. When a thread sees a new epoch and transition to it, it agrees to have finished accessing any data in the old epoch. When all threads have moved (at their own pace) to a new epoch, the system can reclaim the memory supporting the older epoch’s data.

On the client-side, threads pin to a core as well. Clients maintain sessions with the servers. Each client thread opens a session with each server, allowing independent client threads to talk to the same server without any internal coordination. When accessing the key, a client thread uses its session for the partition/server hosting the key and batches the request before sending it over to the server with other requests from the same session.

The scheme with sticky sessions works well when the partitions are static. However, in a dynamic situation, it may be possible that a client sends a request to a server that no longer serves a key. This is where the second aspect of the design comes in. The naive solution would be for a server to check ownership for each request, but this would also be a waste of CPU cycles for the vast majority of requests. Instead, Shadowfax checks request ownership at a batch level using view numbers and drops outdated batches. Each server has a view number representing its configuration version. Whenever there is a partitioning change impacting a server, its view increases. The client attaches the view obtained (and cached) from ZooKeeper to a batch, so if the client’s information is stale, its batches will not succeed. Naturally, when this happens, the client refreshes the keyspace mapping and server views from ZooKeeper

The migration tries to avoid as much coordination as possible. It leverages the idea of the asynchronous cut to transition between phases of the migration process — once all threads finish a migration phase, they can transition to the next phase. I will skip the detailed explanation of data migration, however, I will mention that it requires a few state transitions on both the source and target nodes, and during the process, some requests may get delayed, as the target machine may not have yet received the data for remapped/repartitioned keys. In short, the source must find the keys to send, tell the target to expect the data migration, and move the keys, while the target needs to accept the keys and eventually start serving them. 

Another extension of Shadowfax over the FASTER store is the ability to move data from SSD to shared cloud storage. The main benefit of using cloud storage is the speed of reconfiguration and migration. Shadowfax eventually writes all records from SSD to shared cloud storage. This cloud copy allows the migration procedure to avoid copying the records from SSD and instead migrate the index with pointers to the records in the shared cloud storage.

Regarding performance, Shadowfax seems to provide around 130 million ops/sec on one 64-core server. On 12 servers (and 3 clients) as shown in the figure here, the throughput can go as high as 930M ops/sec. Scale-up procedures also seem to be quick with little impact on performance. We can see a dip in source throughput, but it is relatively short and lasts way under a minute in all tested scenarios.

Discussion

1) Fault tolerance? While it is impressive to get up to 130M ops/sec of throughput over TCP (albeit with FPGA network acceleration), I cannot stop wondering what kind of applications need such per-node throughput and do not need data redundancy? If we operate at such speeds, a server reboot can mean millions of lost operations that existed only in RAM. 

2) Client Scalability. Shadowfax clients communicate with servers over TCP, but the system assumes a small number of clients/sessions. In fact, the authors perform most evaluations with just one massive client. What happens is that a client needs a session to each server from each of its threads. So a 64-core client will have 64 sessions per server. I am not entirely sure if having this many sessions will impact the performance. The paper achieves 930M ops/sec in 12 server clusters with 3 clients, but each client is massive, and all three clients combined have 192 sessions on each server. The authors claim this result as proof that the servers can handle many client sessions. 

One question I have is whether having more smaller clients will impact the performance even if it is the same number of sessions? Referring to the theme of the first discussion point, what kind of application will have a single client producing 310 (930/3) million events/records/updates each second? Of course, an argument can be made that these clients are “proxies” for many real clients, like sensors, IoT devices, etc., but that will also mean that these proxy clients will not be able to provide 310M ops/sec, as they need to manage a lot of connections from actual data producers. So this, in turn, will require more proxy clients, hence more sessions. 

3) Benefits of cloud storage? Shadowfax uses shared cloud storage to copy data from SSD. However, this cloud storage does not seem to bring many benefits to the system, except for the data migration and recovery use case. Cloud storage also provides some limited protection from data loss when a server crashes and never reboots. But this protection only covers some data since “hot” keys in RAM are not written to SSD and consequently not copied to the cloud, leading to the cloud alone having incomplete or stale data. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Shard Manager: A Generic Shard Management Framework for Geo-distributed Applications

The 97th paper in the reading group was “Shard Manager: A Generic Shard Management Framework for Geo-distributed Applications.” This paper from Facebook talks about a sharding framework used in many of Facebook’s internal systems and applications.

Sharding is a standard way to provide horizontal scalability — systems can break down their data into (semi-) independent chunks and store and process operations on these data chunks on different servers. There are many complex questions about this seemingly simple concept. For instance, how do we split the data or keyspace into good chunks? And once we know how to partition the keyspace, where do we store the shards and perform compute tasks on them? How do applications find the right partitions for their data?

Despite the widespread use, many large systems handle sharding in an ad-hoc manner, implementing everything they need from scratch. Obviously, reinventing the bicycle each time can be problematic when you run a big company with hundreds of distributed sharded systems and applications. Shard Manager is Facebook’s solution to the sharding problem that aims to be general enough to be reused by many internal systems. It is a comprehensive framework that can decide how to split the data/keyspace (apps can override this and have their own sharding rules), where to place the resulting shards (app specify rules or constraints to guide the placement), and how to find them.

Shard Manager aims to be general and accommodate many workloads and deployment patterns observed at Facebook. The paper states that currently, 54% of all sharded applications at Facebook use the framework. Unlike other sharding frameworks that are constrained to a single data center or region, Shard Manager works across regional boundaries. In a traditional sharding framework, if an application needs a copy of a partition in a different region, it must orchestrate this copy’s placement in the regional instance of the sharding platform. This is a problem for availability, scalability, fault-tolerance, and cost, as components managing the shard placement and migration, are separated between different systems. With Shard Manager, the shard’s placement is not confined to a single region, and copies can move between the geographical regions, all controlled by one system. Speaking of shard copies, Shard Manager assumes replicated shards and also allows for a “special” primary shard to facilitate leader-based replication approaches.

Of course, all shard placements and movements are constrained based on the application requirements and load balancing needs. The applications set the constraints, some of which are hard and cannot be violated, and some are more like a wish list — good to have but not strictly necessary. The hard constraints ensure the minimum requirement for shard placement, such as the server capacity needed for a shard. Examples of soft constraints or goals are preferred geographical location, the spread of replicas across failure domains, and various load balancing requirements.

To make everything tick, Shard Manager consists of multiple components. The orchestrator component is a monitor for health and resource usage; it makes the decisions to change the placement of shards whenever needed. The allocator component creates new shard-to-server assignments. Allocator also receives input from Twine, Facebook’s cluster manager. This connection allows the orchestrator to make decisions based on planned or anticipated events known to Twine, such as servers shutting down or rebooting for maintenance. Apparently, this is a big deal, as it allows applications to gracefully handle infrastructure maintenance. The paper spends quite some time talking about graceful shard migration, especially for cases when a primary copy of a shard needs to move to a different server.

Finally, the ZooKeeper stores all the important stuff, such as the orchestrator’s state and shard assignments. ZooKeeper also acts as a failure detector for application nodes. On the application side, servers used Shard Manager (SM) Library to interact with the orchestrator and ZooKeeper. Application clients, on the other hand, interact with Service Discovery to find the required shards. We recently covered the Delos paper that talks about building control plane storage for Facebook to replace ZooKeeper, so it is likely that the actual ZooKeeper has been replaced with a Delos-based system.

The scale of Facebook’s operation demands the Shard Manager to be sharded itself. The system is composed of many Mini Shard Managers (Mini-SMs). Each Mini-SM handles multiple partitions, and each partition represents a slice of servers available to some application across many regions.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Solar Superstorms: Planning for an Internet Apocalypse

Our 96th reading group paper was very different from the topics we usually discuss. We talked about the “Solar Superstorms: Planning for an Internet Apocalypse” SIGCOMM’21 paper by Sangeetha Abdu Jyothi.

Now (May 2022), we are slowly approaching the peak of solar cycle 25 (still due in a few years?) as the number of observable sunspots grows. If you ask how the sunspots relate to the distributed systems, it turns out that there is a pretty significant connection. See, sunspots can produce coronal mass ejections (CMEs) of magnetized and charged particles flying out from the sun, and if our planet happens to be on a collision course with such CME, we may be up for a nasty surprise. When a CME hits the Earth, we experience a “solar storm.” Earth’s magnetic field protects living organisms from the negative impacts of these charged particles. However, the interaction between the particles and the magnetic field can produce current in large/long pieces of conductive materials on the planet. These conductors act as primitive electric generators in the fluctuating magnetic field of a solar storm.

It happens to be the case that people rely on long stretched out conductors all over the Earth — our electric grid. Substantial research has been done on the resilience of the electric grid in the face of solar storms since the currents produced in wires spanning many hundreds of miles can be substantially high to destroy all kinds of electrical equipment and appliances. Our modern communication networks rely on fiber-optic cables, which are nonconductive. However, fiber-optic cables need repeaters to boost the signal along the way; these repeaters need power, so any substantially long fiber-optics link also has a piece of wire to deliver power to the repeaters. This is a big problem for the internet — current induced by the solarstorm in these wires can destroy repeaters, rendering the entire fiber-optic cable non-operative. This paper studies how resilient our internet infrastructure (mainly the cables/links that make up the backbone of the internet) is in the face of solar superstorms.

Solar superstorms are solar storms of very intensity. These events are believed to cause significant damage to power and communications infrastructure. Humanity has lived through them just fine in the past, but there have not been any solar superstorms recently enough to stress our grids and especially modern communication networks. The two most recent large events occurred in 1859 and 1921. In both cases, these solar storms caused telegraph service to go down, and the 1921 storm also brought down the only operational undersea communication link of the time. Technology has advanced a lot in the hundred years since the last event, and we ought to know whether the communication infrastructure can survive. An important point to mention from the paper is that the likelihood of a solar superstorm happening is far from negligible and roughly compare to a “hundred-year flood.”

The paper is not very optimistic about the fate of the internet in the case of a solar superstorm. A significant amount of internet infrastructure resides in the northern hemisphere above 40°N latitude. This happens to be a problem, as the currents induced in conductors increase as we get closer to the poles (this is also a reason we see auroras in Alaska and not Hawaii). The long under-sea cables are especially susceptible. It turns out that out of links/cables connecting the US and Europe, all but one happen to be in the more vulnerable region above 40°N. Another interesting fact is the disparity between internet infrastructure in vulnerable latitudes and the population — we have way a lot of infrastructure there and not as many people. The paper points out Facebook’s infrastructure here as well, as its infrastructure is predominantly located in the northern parts of the northern hemisphere.

Anyway, the paper suggests building redundant links that connect countries and continents in regions below 40°N. Another suggestion is the isolation of links that are in vulnerable regions from other cables that are more south. For example, if an undersea cable induces current due to a superstorm, it is important to ensure that current does not “jump” (at the interconnection point) to another, otherwise unimpacted, cable further south. Another suggestion is powering down the links when the solar storm is coming. The power-off mode does not stop currents from appearing in the conductors, but it may reduce the damage or failure rate and leave more equipment operational after the storm has passed. The paper has a few more suggestions and a few simulations to try different scenarios of link/cable failures, but such simulations are very speculative since we do not know what the failure rate can be in different cables and locations on the globe.

New Reading List: Papers #101-110

Summer term papers are here! The list is bellow. Also, here is a Google Calendar.

Reading Group. ByShard: Sharding in a Byzantine Environment

Our 93rd paper in the reading group was “ByShard: Sharding in a Byzantine Environment” by Jelle Hellings, Mohammad Sadoghi. This VLDB’21 paper talks about sharded byzantine systems and proposes an approach that can implement 18 different multi-shard transaction algorithms. More specifically, the paper discusses two-phase commit (2PC) and two-phase locking (2PL) in a byzantine environment.

As usual, we had a presentation of the paper. Karolis Petrauskas did an excellent job explaining this work: 

The paper states that modern blockchain technology relies on full replication, making the systems slower and harder to scale. 

Sharding is a natural way to solve the problem and has been done countless times in the crash fault tolerance setting. Of course, a sharded system often needs to perform transactions that touch data in more than one shard. The usual way to solve this in CFT is to use some version of 2PC coupled with some concurrency control mechanism, like 2PL. ByShard follows this general recipe, but in the BFT setting, which complicates things a bit. The problem is making 2PC and 2PL work in a byzantine, adversarial environment without tightly coupling all shards back together into one “megashard.” So, we need a way to communicate between shards in a reliable way. 

Let’s look at a transaction lifecycle. When we have a transaction that spans multiple shards, the first step is to deliver this transaction to each shard and check whether a shard can run it. Then, if everything is ok, we need to run the transaction and provide some isolation from other ongoing transactions. ByShard implements all these actions with shard-steps. Each shard-step is a building block of all ByShard protocols and allows the shard to inspect a transaction, make changes to the local state, and send the message to start another shard-step on another shard. Overall, ByShard uses three distinct types of shard-steps: vote-step, commit-step, and abort-step. 

The message sending part is kind of important, as we need this communication to be reliable in the BFT setting. The paper gracefully ignores this problem and points to a few solutions in the literature. In short, ByShard requires a cluster-sending protocol that ensures reliable communication between shards, such that, all correct nodes of the receiver shard get the message, all the correct nodes of the sender shard get an acknowledgment, and that sending requires the sender shard to reach an agreement on what to send. The last point ensures that bad actors do not send malicious stuff, and I suspect on the receiver there needs to be a way to check that the received messages were indeed certified by the sender’s consensus. 

Vote-step is used to replicate the transaction between shards. When a shard receives the transaction, it starts the vote-step and checks whether it can proceed with the transaction. The shard may also perform local state changes if needed. At the end of the vote-step, a shard forwards some information to another shard to start a new shard-step. Since we only have three building blocks, the stuff vote-step sends can start another vote-step, commit-step, or abort-step at the receiving end. The purpose of commit-step and abort-step is self-evident from their name. One important thing to note on abort-step is that it needs to undo any local changes that a prior vote-step might have done to ensure that the aborted transaction leaves no side effects. 

Now we can look at how ByShard composes these three basics steps. The figure above provides a visual illustration of three different ways ByShard runs 2PC. One aspect of the figure that I fail to understand is why not all shards run vote-step and commit-step, and the text does not really provide an explanation.

In the linear orchestration, the transaction progresses from the coordinator one shard at a time. If any shard decides to abort, it needs to start the abort-step and notify all other shards involved in the transaction (or at least all other shards that voted earlier). If a shard decides to commit, it actually starts a vote-step in the next shard. If the vote-step successfully reaches and passes the last shard involved in the transaction, then that last shard can broadcast the commit-step to everyone. Centralized orchestration looks more like the traditional 2PC, and distributed orchestration cuts down on the number of sequential steps even further. The three strategies represent tradeoffs between the latency and number of communication exchanges and shard-steps. 

So with 2PC taken care of, we can briefly discuss the concurrency control. ByShard proposes a few different ways to implement it, starting with no concurrency control, thus allowing observation of partial results. Because of the side effect cleaning ability of abort-step, if some transaction partly executes and then reaches the abort-step, then its execution will be undone or rolled back. This reminds me of the sagas pattern. The other solution is to use locks to control isolation. The paper (or the presentation above) has more details on the nuances of locking and requiring different locks with a different type of orchestration. By combining different ways to orchestrate the transactions with different ways to execute them, ByShard presents 18 BFT transactional protocols with different isolation and consistency properties. 

Discussion

1) Comparison with Basil. An obvious discussion is a comparison with Basil, another transactional sharded BFT system. Basil is a lot closer to the Meerkat solution from the CFT world, while ByShard is a more classical 2PC+2PL approach. In Basil, the degree of fault of tolerance is smaller (i.e, it needs 5f+1 clusters). At the same time, ByShard is a lot underspecified compared to Basil. ByShard relies on existing BFT consensus and BFT cluster-broadcast mechanisms to work, while Basil provides a complete and contained solution. On the performance side of things, ByShard requires a lot of steps and a lot of consensus operations across all the involved shards to do all of the shard-steps. This must have a significant performance toll. While it is not correct to compare numbers straight between papers, Basil can run thousands of transactions per second, while ByShard’s throughput is in single digits. However, it is worth mentioning that ByShard’s experiments included more shards; ByShard’s transactions also involved large number of shards. 

2) (Distributed) Sagas Pattern. As I briefly mentioned in the summary above, ByShard, especially with linear orchestration and no isolation reminds me of Sagas patterns. Distributed sagas are used to orchestrate long-running requests in microservice-type applications. If we squint our eyes, we can see each shard as a separate microservice. As vote-steps propagate across shards, they perform local changes. And if an abort is needed, the abort causes these changes to be rolled back. However, when we add isolation to ByShard, the similarities with sagas start to disappear. 

3) Performance. An important motivation for sharding is performance, however, it does not seem like ByShard achieves stellar performance here. Of course, sharding is still useful for systems that operate with large amounts of data that otherwise would not fit into a single machine. Nevertheless, without strong performance, a solution like this has very few advantages over not using sharded/partitioned systems at all. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!