Tag Archives: fault-tolerance

Reading Group Special Session: Scalability and Fault Tolerance in YDB

YDB is an open-source Distributed SQL Database. YDB is used as an OLTP Database for mission-critical user-facing applications. It provides strong consistency and serializable transaction isolation for the end user. One of the main characteristics of YDB is scalability to very large clusters together with multitenancy, i.e. ability to provide an isolated user environment for users in a single cluster. In this talk, we will cover two layers of YDB – Tablet and BlobStorage layers that together provide fault tolerance, scalability, and user isolation.

Tablet is a very lightweight component that implements distributed consensus. It is a building block for user data storage, schema manager, tablet manager, system views, and many other components. A single YDB cluster is known to have more than a million tablets, a single YDB node can serve up to 10K tablets. Tablet implements distributed consensus protocol over a shared log provided by the BlobStorage layer.

The BlobStorage layer is a fault-tolerant key-value append-only database. It implements a specific API for logging, blobs reading and writing, garbage collection, and quorum management. We believe that the protocol between Tablet and BlobStorage is quite simple and understandable by listeners compared to Paxos or Raft. We also illustrate the flexibility of YDB in real-life use cases like migrating some parts of the cluster from one availability zone to another without a maintenance window or storing some data blobs on fast media like SSD and another blob on HDD.

Speaker: Andrey is the head of the YDB project (https://ydb.tech). He works in Yandex as a Head of Department. He also has a long story in Yandex as a member of the Web Crawler Team, a member of the Infrastructure team, one of the creators of Yandex Cloud, and now the part of CloudIL Team. Andrey holds a Ph.D. from Moscow State University.

When: Wednesday, August 10th at 2:00 pm EST (6 pm UTC)

Where: Zoom

Reading Group. Solar Superstorms: Planning for an Internet Apocalypse

Our 96th reading group paper was very different from the topics we usually discuss. We talked about the “Solar Superstorms: Planning for an Internet Apocalypse” SIGCOMM’21 paper by Sangeetha Abdu Jyothi.

Now (May 2022), we are slowly approaching the peak of solar cycle 25 (still due in a few years?) as the number of observable sunspots grows. If you ask how the sunspots relate to the distributed systems, it turns out that there is a pretty significant connection. See, sunspots can produce coronal mass ejections (CMEs) of magnetized and charged particles flying out from the sun, and if our planet happens to be on a collision course with such CME, we may be up for a nasty surprise. When a CME hits the Earth, we experience a “solar storm.” Earth’s magnetic field protects living organisms from the negative impacts of these charged particles. However, the interaction between the particles and the magnetic field can produce current in large/long pieces of conductive materials on the planet. These conductors act as primitive electric generators in the fluctuating magnetic field of a solar storm.

It happens to be the case that people rely on long stretched out conductors all over the Earth — our electric grid. Substantial research has been done on the resilience of the electric grid in the face of solar storms since the currents produced in wires spanning many hundreds of miles can be substantially high to destroy all kinds of electrical equipment and appliances. Our modern communication networks rely on fiber-optic cables, which are nonconductive. However, fiber-optic cables need repeaters to boost the signal along the way; these repeaters need power, so any substantially long fiber-optics link also has a piece of wire to deliver power to the repeaters. This is a big problem for the internet — current induced by the solarstorm in these wires can destroy repeaters, rendering the entire fiber-optic cable non-operative. This paper studies how resilient our internet infrastructure (mainly the cables/links that make up the backbone of the internet) is in the face of solar superstorms.

Solar superstorms are solar storms of very intensity. These events are believed to cause significant damage to power and communications infrastructure. Humanity has lived through them just fine in the past, but there have not been any solar superstorms recently enough to stress our grids and especially modern communication networks. The two most recent large events occurred in 1859 and 1921. In both cases, these solar storms caused telegraph service to go down, and the 1921 storm also brought down the only operational undersea communication link of the time. Technology has advanced a lot in the hundred years since the last event, and we ought to know whether the communication infrastructure can survive. An important point to mention from the paper is that the likelihood of a solar superstorm happening is far from negligible and roughly compare to a “hundred-year flood.”

The paper is not very optimistic about the fate of the internet in the case of a solar superstorm. A significant amount of internet infrastructure resides in the northern hemisphere above 40°N latitude. This happens to be a problem, as the currents induced in conductors increase as we get closer to the poles (this is also a reason we see auroras in Alaska and not Hawaii). The long under-sea cables are especially susceptible. It turns out that out of links/cables connecting the US and Europe, all but one happen to be in the more vulnerable region above 40°N. Another interesting fact is the disparity between internet infrastructure in vulnerable latitudes and the population — we have way a lot of infrastructure there and not as many people. The paper points out Facebook’s infrastructure here as well, as its infrastructure is predominantly located in the northern parts of the northern hemisphere.

Anyway, the paper suggests building redundant links that connect countries and continents in regions below 40°N. Another suggestion is the isolation of links that are in vulnerable regions from other cables that are more south. For example, if an undersea cable induces current due to a superstorm, it is important to ensure that current does not “jump” (at the interconnection point) to another, otherwise unimpacted, cable further south. Another suggestion is powering down the links when the solar storm is coming. The power-off mode does not stop currents from appearing in the conductors, but it may reduce the damage or failure rate and leave more equipment operational after the storm has passed. The paper has a few more suggestions and a few simulations to try different scenarios of link/cable failures, but such simulations are very speculative since we do not know what the failure rate can be in different cables and locations on the globe.

Reading Group. In Reference to RPC: It’s Time to Add Distributed Memory

Our 70th meeting covered the “In Reference to RPC: It’s Time to Add Distributed Memory” paper by Stephanie Wang, Benjamin Hindman, and Ion Stoica. This paper proposes some improvements to remote procedure call (RPC) frameworks. In current RPC implementations, the frameworks pass parameters to function by value. The same happens to the function return values. Passing data by value through copying works terrific for small parameters and returns. Furthermore, since the caller and callee have independent copies of the data, there are no side effects when one copy is modified or destroyed. Things start to become less than ideal when the data is large and gets reused in multiple RPC calls. 

For example, if a return value of one function needs to propagate to a handful of consecutive calls, we can end up performing many wasteful data copies. The wastefulness occurs as data transfers back to the caller and later gets copied multiple times to new RPCs. The obvious solution is to avoid unnecessary copying and pass the data within the RPC framework by reference using shared memory. In fact, according to the paper, many applications resort to doing just this with distributed key-value stores. Before invoking a remote function, a caller can save the parameters to the shared KV-store, and pass the set of keys as parameters to the RPC. Managing memory at the application level, however, has some problems. For example, the application must manage memory reclamation and cleanup objects no longer needed. Another concern is breaking the semantics of RPCs, as shared mutable memory can have side effects between different components of the system. 

Instead of an ad-hoc application-level solution, the paper proposes a managed approach to shared memory in RPCs. The improved framework preserves the original RPC semantics while enabling shared memory space and facilitating automatic memory management and reclamation. The paper suggests having immutable shared memory where values remain read-only after an initial write. This shared memory system will have a centralized scheduler for RPCs and a memory manager to keep track of memory usage and active memory references. With a centralized memory management approach, the RPC framework can do a few cool things. For instance, the memory manager knows when a reference is passed between nodes and when it goes out of scope on all functions/clients. This global memory knowledge allows the memory manager to perform garbage collection on values no longer needed. The paper describes this in a bit more detail, including the APIs to facilitate global memory tracking. Another benefit of the RPC framework with a centralized scheduler and memory manager is locality awareness. With information about data location in the cluster, the system can collocate function invocation on the machines that already have the values needed. 

The paper mentions that the proposed vision is already somewhat used in many specialized systems, such as Ray (Ray is from the same team as the paper) or Distributed TensorFlow. However, these systems are far from general and often used as parts of larger software stacks. The challenge with this new improved RPC framework is to make it general enough to span across the systems to allow better integration and more efficient resource use. I think of this as an RPC-as-a-service kind of vision that facilitates efficiency, interoperability and provides some blanket guarantees on communication reliability. 

As always, the paper has a lot more details than I can cover in the summary. We also have the presentation video from the reading group:

Discussion

1) Other Distributed Systems Frameworks. Frameworks for building distributed systems seem to gain more and more popularity these days. We were wondering how these frameworks address some of the same issues, such as copying data between components and managing locality. This is especially relevant for actor-style frameworks. We had some feedback on the Orleans framework. Since locality is important for performance, frameworks tend to move actors to the data. It often seems to be cheaper to move a small executable component than large chunks of data. More specialized data processing systems have been using these tricks for some time and can move both data and compute tasks around for optimal performance. 

2) Fault Tolerance. Fault tolerance is always a big question when it comes to backbone distributed systems like this one. With stateful components and shared memory subsystems, the improved RPC framework may be more susceptible to failures. While well-known and traditional techniques can be used to protect the state, such as replicating the shared memory, these will make the system more complicated and may have performance implications. Some components, like scheduler and memory manager, are on the critical path of all function invocations. Having strongly consistent replication there may not be the best choice for scalability and performance. The other option is to expose errors to the application and let them deal with the issues like in more traditional RPC scenarios or lower-level communication approaches. However, due to the size and complexity of the improved system, it would have been nice to have some failure masking. We were lucky to ask Stephanie Wang, the author of the paper, about fault tolerance, and she suggests RPC clients handle errors by reties, however, she also mentioned that it is still an open question about how much overall fault tolerance and fault masking is needed in a system like this one. 

3) Programming Model vs. Implementation. Another important discussion question was on the vision itself. The paper mentions a few specialized systems that already have many of the features described. This made us wonder what is the paper proposes then? and what are the challenges? It appeared to us that the paper’s vision is for a general and interoperable RPC system to act as a glue for larger software stacks. Stephanie provided some of her first-hand feedback here that I will quote directly:

“In my opinion, Ray already has (most of) the right features for the programming model but not the implementation. One of the main implementation areas that the paper discusses is distributed memory management. Actually, I don’t think that any of the systems mentioned, including Ray, do this super well today. For example, I would say that Distributed TensorFlow is quite sophisticated in optimizing memory usage, but it’s not as flexible as pure RPC because it requires a static graph. The second thing that I think is still missing is interoperability, as you said. I believe that this will require further innovations in both programming model and implementation.”

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Metastable Failures in Distributed Systems

Metastability is a stable state of a dynamical system other than the system’s state of least energy.

Wikipedia

Distributed systems often fail spectacularly and unpredictably. They are a cause for a headache and sleepless on-call nights for way too many engineers. And this is despite lots of efforts to understand the failures, and all the tools and “best practices” we have to contain and/or prevent them. 

Today I want to talk about failures that may occur on a nearly healthy system with no perceived major bugs or design flaws. We describe this type of failure in our HotOS’21 paper. This work is in collaboration with Nathan Bronson, Abutalib Aghayev, and Timothy Zhu. It is largely based on Nathan’s observations and experiences.

So how does a “healthy” system fail? Naturally, something needs to happen for a failure to occur. We call this something a trigger. Lots of things can act as a trigger (GC, network blips, software pushes, configuration changes, load spikes, etc.). Triggers are impossible to avoid in our multi-tenant unreliable systems. 

Triggers cause backlogs or errors and make the system temporarily less happy. In some cases, however, they cause the system to spiral into the abyss and practically halt any useful work or goodput. When the trigger is removed (it was transient, or removed by the engineers), the system does not recover by itself. So we end up in a peculiar situation when the condition that caused a failure is no longer present, all system’s components seem to be functioning correctly, all servers/nodes are up, the code is working, but nothing useful is actually getting done and goodput remains negligible. 

We call this failure pattern a metastable failure, or rather we say that a system has entered a metastable failure state. The culprit behind metastable failures is a sustaining effect that prevents the system from leaving a bad/failed state even after the initial trigger is removed.

To understand more about the importance of triggers and sustaining effects, we need to look at how most distributed systems are deployed. Resource utilization is a major factor for large systems, as achieving better utilization can save a lot of money at scale. This means that systems often have little extra resources to spare when some unexpected load increase occurs. These systems operate in a metastable state that we call a metastable vulnerable state, as the system is vulnerable to a failure if enough of a trigger disturbs its stability.

Of course, having some unanticipated extra load applied to the system is not sufficient for a metastable failure to occur. In fact, even if the load pushes the system to run at its maximum capacity (or tries to run above the capacity), we should only see an increase in latency, and maybe some goodput degradation. When the extra load is removed, the system should return back down to normal operation all by itself. 

In metastable failure, the extra unanticipated load activates a positive feedback loop that creates more load. This positive feedback loop is the sustaining effect that prevents the system from recovering when the initial trigger is resolved. We say that a system is in a metastable vulnerable state when it is possible to activate such a sustaining effect loop using a strong enough trigger. This state typically occurs at the higher system utilization, as the system has fewer spare resources to absorb the extra load of the trigger without activating the positive feedback mechanism. Opposite of metastable vulnerable state is a stable state, where the positive feedback loop is not possible, allowing the system to recover by itself when the extra load is removed. Note that a system in a stable state may still have a sustaining effect mechanism, but it is not strong enough to feed on itself and diminishes over time when the extra load is removed. 

Let’s look at a hypothetical idealized example of a metastable failure. Consider a web application that uses a database capable of serving up to 300 queries per second (QPS). If more than 300 QPS are tried, the latency goes up by an order of magnitude, causing the web application to retry once after a 1-second timeout (when a retry fails, the system errors out). The web servers for the application produce a 280 QPS workload, which falls nicely below the database’s serving capacity.

Now let’s say there was a network glitch lasting a few seconds between the web servers and the database. After the network is fixed, all the delayed packets reach the database, resulting in a flooding manner, exceeding the database’s capacity of 300 QPS. This, in turn, increases the latency above the retry threshold at the web application, causing any new, post-network-glitch queries to retry as well. So now we have a backlog of queries from the network glitch and an effective new query load of 560 QPS (280 queries from the workload + 280 query retries). With such a high and sustained load, the database will not return to a normal state all by itself. This results in a maxed-out system that clears 300 QPS of load (smaller than the amplified load) and produces very little goodput since most of the queries take too long to process and get timed-out or discarded by the time the database returns them. This can continue indefinitely until either the offered workload reduces to under 150 QPS or a retry policy is changed/suspended.

Our simple example illustrates key metastable concepts and principles. In a stable state, a system can (gradually) return to normal operation. In the example, workload under 150 QPS is stable, since even with the workload amplification caused by the retry policy, the database will receive fewer new requests than it can handle each second, allowing it to gradually clear up the backlog and return to normal operation. While the system is clearing the backlog, its goodput, however, may remain compromised. Systems often operate in the vulnerable state because it seems from all metrics like the stable state is wasting resources, and the vulnerability is not known. This, however, leaves very little spare resources to absorb the extra load spikes caused by the trigger events. Once a system enters into a metastable failure state, it will feed onto itself until there is some sort of intervention. This intervention is costly, as it generally requires either reducing the offered load on the system or finding ways to break the positive feedback loops.

Another possible intervention approach is to increase the capacity of the system to push it into a stable state, but this can be challenging since the triggers are unanticipated. Such unpredictability makes it impossible to reconfigure to add capacity beforehand. And once the system is already in a metastable failure state, it may not have any resources to spare to run the reconfiguration procedure. In fact, a reconfiguration may even increase the workload amplification and make the whole situation worse.

In the paper, we discuss a few other hypothetical and anecdotal scenarios that are a bit more complicated. But the problem is not anecdotal by any means, as there are quite a few failure cases that surely look a lot like metastable failures “in the wild”:

  • Google App Engine Incident #19007. Trigger: Configuration change. Sustaining Effect: cascading load amplification. Fix: Reduce traffic level.
  • Amazon SimpleDB Service Disruption. Trigger: power loss crashing multiple servers. Sustaining Effect: load amplification due to a timeout. Fix: Change in timeout policy & introduction of additional capacity.
  • Cassandra Overload because of hint pressure + MVs. Trigger: rolling restart. Sustaining Effect: Few nodes could not catch up with hinted hand-off, preventing them from fully joining, causing the system to generate more hints. Fix: Policy change – disable hinted handoff.

With metastable failures affecting real systems, we need to have more understanding of the problem and processes involved to develop better coping and prevention strategies. A recurring pattern in our experience is that changes meant to improve the common case behavior of a system tend to increase the strength of the sustaining effects. Fast paths, caches, retries, failover, load balancing, and autoscaling all make the failure state less resource-efficient relative to the normal state, which makes the feedback loop worse. Beware of very high cache hit rates!

Metastable failures are over-represented in large site outages because the strength of the sustaining effect depends on scale. For example, if the feedback loop requires overloading a network fabric then small-scale stress tests will never trigger it. The sustaining effect may also act as a means of contagion so that the problem spreads across machines or shards. This means that the first occurrence of a novel metastable failure may be a major outage even in a hyper-scale distributed system.

Current approaches for handling metastability often lack the full comprehension of the problem and its causes. For example, engineers often focus on the trigger that causes the failure and fail to realize the complicated positive feedback loops that are responsible for the scale of the failure. Fixing a trigger is a temporary solution that may only push the system higher into the metastable vulnerable zone and make the next crash even more severe.

Unfortunately, replicating the failures and feedback loops is difficult, as many of the issues only manifest themselves at scale. This makes it ever so harder to fully understand the failures and develop efficient techniques for dealing with them. Furthermore, predicting the possibility of failure is difficult too. For instance, one can look for unexpected performance variations, and try to correlate them with other things going on in the system to learn the potential future triggers, but this still does not give the full predictive power of when a failure may happen. Improvements in our ability to predict and avoid metastable failures will also translate directly to efficiency gains because it will let us operate systems closer to their natural performance limits.

I think this is an exciting area for research. As the industry makes bigger and bigger systems and pushes them to work as cheaply as possible, we need to develop a proper understanding of how these critical systems fail at scale so we can continue improving the reliability.

Reading Group. Protocol-Aware Recovery for Consensus-Based Storage

Our last reading group meeting was about storage faults in state machine replications. We looked at the “Protocol-Aware Recovery for Consensus-Based Storage” paper from FAST’18. 

The paper explores an interesting omission in most of the state machine replication (SMR) protocols. These protocols, such as (multi)-Paxos and Raft, are specified with the assumption of having a crash-resistant disk to write the operation log and voting metadata. This disk data allows crashed nodes to restart safely. However, the real-life gets in a way a bit, as infallible storage is as real as unicorns. 

Storage may fail in peculiar ways, when some data may get corrupted, while most other data is correct and the server itself continues working. The problem here is handling such failures. The simplest way is to treat the server as crashed. However, the server must remain crashed, as restarting may get into even more severe state corruption, as the server replays the operations from a faulty log. The paper talks about a variety of other approaches taken to deal with these data issues. The authors state that all the mechanisms they have explored were faulty and led to liveness or safety issues. I personally do not buy such a blanket statement, but a few of the examples in the paper were really interesting. 

The paper then suggests a solution – Protocol-Aware Recovery (PAR). The main point here is to avoid ad-hoc solutions because they are either slow, unsafe, complicated, or all of the above. This makes sense since such a big omission (potential for data-corrupting disk failures) in protocols should be addressed at the protocol level. The paper draws heavily on the Raft state machine protocol and develops the recovery procedure for it.

The log recovery is leader-based and can be broken down into two sub-protocols: follower recovery and leader recovery. The followers are recovered by restoring the data from the leader who always knows of all the committed history. Leader recovery is a bit more tricky and occurs as part of a leader election. Of course, if a non-faulty node can be elected a leader, then recovering faulty nodes is easy with the follower recovery. However, the leader election requires a node to have the most up-to-date log to become a leader, limiting a selection of nodes for the job. That being said, the node can be elected with a corrupted log, but it needs to recover the corrupted entries from the followers. If the entry is not available on any of the followers, the state machine becomes stuck (as it should). The protocol only recovers committed log entries and follows Raft logic to discard non-committed log suffix if it has corrupted entries. 

In addition, to log recovery, the paper also talks about snapshot recovery. The idea behind snapshot recovery is to make sure all nodes take the same snapshots at the same index in the log, break them into “chunks” and recover chunks as needed from other nodes. 

Here is the presentation by Rohan Puri:

Discussion

1) The need for logs? The paper assumes that a state machine takes periodic snapshots to a disk/drive, and such snapshot in combination with a log can be used for node recovery later. This implies that the actual current state of the state machine can be lost due to a server restart. However, some state machines are directly backed by the disk, in essence, representing a rolling snapshot that gets updated every time an operation from the log applies. Recovery of such disk-backed state machine can be quicker and require only log entries happening after the crash/restart. Of course, this does not mean that the disk-backed state machine itself cannot be corrupted. In any case, the log entries are required for recovery and can be garbage collected once all nodes have persisted the state machine to disk (either as part of normal operation or a snapshot), making the time-frame for the log entries to remain useful to be relatively small. 

A more interesting problem may arise in trying to recover the corrupted state machine. If we rely on this “rolling-snapshot” disk-backed state machine, the mechanism the paper uses for snapshot recovery won’t work, since different copies of the state machine may be misaligned ever-so-slightly. Of course, one can always do the costly node restore procedure — restore to some prior snapshot and replay the log, but this is wasteful and requires keeping an extra snapshot and log from the snapshot onwards. In the spirit of the paper, we should rely on distributed copies instead and be able to restore the corruption without relying on storing redundant copies on the same server

2) Persistent memory vs RAM and recovery for in-memory SMR. If we build a state machine replication (SMR) to work purely off RAM, then we do not have the luxury of retaining any state after a restart. As such, in-memory state machines must have different mechanisms to ensure safety. For example, in traditional Multi-Paxos with a disk, a node always remembers the current term/ballot and past votes it has participated in. Without durable memory, a node restart erases the previous voting state, allowing a node to vote on something it has already voted on before, but with a lower term/ballot. This is not safe and may lead to a double-commit on the same log entry when a node promises to some new leader, and then after restart makes a second promise in the same log index to some older leader. 

Allowing for corruption in persistent memory is somewhat similar to not having persistent memory at all, at least when dealing with crashes/restarts. The very piece of data/metadata we need to ensure safety and avoid double voting as in the example above may be corrupted and cannot be used after a restart. However, the same precautions used for in-memory replicated state machines will work with corrupted storage as well and allow for safe recovery. For example, to prevent the double-voting example, a recovering node needs to run a “mock” leader election (or a leader election with a term guaranteed to not succeed). Such leader election will ensure the node gets a proper view of the current ballot/term in the cluster to make sure it no longer accepts votes from prior leaders. After such a mock election, the node can start accepting/voting for log entries while recovering any prior log and/or state machine from any of the replicas. Of course, the full recovery completes when enough data is shipped from other nodes (i.e. snapshots + missing log entries). 

There are a few differences between RAM and persistent storage when it comes to recovery. First of all, while it seems like both can lose data (one due to a reboot, another due to some random corruption), persistent storage still has a hint of data being missing. This is like not remembering what the node has voted for or who was the leader, but still having a 6th sense that something was voted upon. This extra piece of information may be useful in recovery, and indeed the protocol from the paper takes advantage of that to improve fault tolerance and safety. The recovery protocol preserves safety when the majority of nodes fail at the same log index, as the protocol knows something is missing entirely and will halt for safety. In the RAM setting, a mass reboot (i.e. majority of nodes) leads to a collective loss of memory without any hint that something may have been agreed upon, leading to a rewrite of the log. 

The second difference is that persistent memory may not lose all the data, so fewer items must be shipped from the followers. 

3) Leader-bound recovery. The paper suggests recovering followers from the leader node. This can put more load on the leader, who is already a bottleneck in the protocol. It seems like it may be possible to recover committed log entries from followers (the paper already does so for leader recovery) to make the recovery procedure less demanding for the leader.

4) Byzantine. The paper touches a bit on this topic. Data corruption on disk can be viewed through the lens of Byzantine fault tolerance. The corruption causes a node to act outside of the protocol specs, and byzantine-tolerant protocols are designed to handle such “out-of-spec” behaviors. The paper is a good example of how we can often solve some specific types of byzantine behaviors without resorting to the full-blown PBFT-style solutions. This is very practical, as we want the state machine to handle data corruptions, but we do not want to pay the performance penalty associated with BFT protocols. 

5) Luckilyhood of data corruption. Another point of discussion was around the likelihood of such data-faults happening. It does not seem like these are too frequent, but they do happen. We touched on a few anecdotal occurrences. For example, some firmware issues causing the disk to not write some large buffers of data. 

It is also worth noting error correction. Error correction is standard for server-grade memory, and it comes at a relatively small monetary/performance cost. Similar error-correction technologies are used in disks and drives, allowing for small errors (i.e. a bit-flip) to be fixed by the drive. In fact, NAND flash SSDs rely on error correction in normal operation.

6) Infallible disk. Protocols assume disk is always correct. Why? Even on the surface, this does not come as a super tight assumption. And especially on the scale of millions of SMR instances deployed across millions of machines.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Toward a Generic Fault Tolerance Technique for Partial Network Partitioning

Short Summary

We have resumed the distributed systems reading group after a short holiday break. Yesterday we discussed the “Toward a Generic Fault Tolerance Technique for Partial Network Partitioning” paper from OSDI 2020. The paper studies a particular type of network partitioning – partial network partitioning. Normally, we expect that every node can reach every other node; under the network partitions, we often assume that a node or a group of nodes are “disconnected” from the rest of the cluster and cannot be reached. Partial partitioning assumes that some communication remains, and some nodes outside the partition can still communicate with the partitioned servers. The paper studies several public bug reports from popular systems and finds quite a few issues attributed to the partial partitioning problem. A simple example of the issue is a system having one leader and multiple follower nodes. If the leader is partitioned away from the followers, the progress may stop. At the same time, if the leader can still communicate with a configuration manager (ZooKeeper) responsible for picking the next leader, that communication manager won’t see a problem and not elect a new leader, causing the entire system to stall. The paper has many insights about the partial partition failures, and it is worth taking a look at the paper itself for these details.  Aside from extensive exploration of the problem, the paper proposes a simple solution to mask such partial partitions, called Nifty. Nifty construct a connectivity graph and uses it to relay/reroute messages between nodes when the direct communication link between nodes fails.

Video presentation

Discussion

We had a pretty lively discussion of this paper.

1) Monitoring. One of the issues identified in the paper is that such failures caused by partial partitions often go silent. However, the proposed solution does not seem to include the monitoring/notification service itself. We speculate that it should be trivial to add it to Nifty.

2) System Design. Another important find from the paper is that many of the bugs were due to design problems. We spent considerable time discussing how better design with model checking can help avoid such issues. One of the issues, of course, is that model checking is only as good as the assumptions you put in. So if the design was checked, but the designer did not take into account, for example, message loss, it may not have caught the problem. The paper raises awareness for the problem, so hopefully, the next generation of systems can be model-checked with partial network partitions in mind.

3) Nifty performance. We had two major questions with regards to performance. The first one is whether Nifty can scale to large clusters, given its all-to-all communication requirement for constructing the connectivity graph. This all-to-all communication gives a quadratic complexity, but we also think that collecting this connectivity information can be done relatively infrequently, given that the application can tolerate some message delay/loss. The second question is whether Nifty can handle large messages. Data-driven systems, like databases, may transmit a lot of data between nodes, and having all these data flow through some “relay” nodes may put more stress on such. Also, both the topology of connected nodes and the state of the application matter.  If two partial partitions are connected by just one server, it will put more stress on that one machine. Similarly, if the system is in a state that requires more data transfer, such as building a new replica, it may cause a lot of traffic/data to flow through Nifty, putting more stress on the “relay” nodes.

4) Other types of network partitions. The paper talks about (mainly) partial symmetrical partitions. Another interesting type of network partition is an asymmetrical or one-way partition, when messages flow in one direction but not the other. However, one can argue that this is still a partial partition, and Nifty can handle such a case with its connectivity graph approach, assuming the graph is directed.

5) Failure Masking. Nifty masks failures, which is great when we need the applications to continue working. However, what happens when/if Nifty fails? Will it create an even bigger/disastrous failure? Another question is whether the partial partition may exist as a transient state towards a full network partition. In such a case, Nifty only gives a little bit of extra time to react to the issue as it emerges and cannot be the only solution to dealing with the problem (hence discussion point #1 and the need for monitoring).

6) We also noted some similarities with the recent Cloudflare issue.

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. RMWPaxos: Fault-Tolerant In-Place Consensus Sequences

Quick Summary

In the last reading group discussion, we talked about RMWPaxos. The paper argues that under some circumstances, log-based replication schemes and replicated state machines (RSMs), like Multi-Paxos, are a waste of resources. For example, when the state is small, it may be more efficient to just manage the state directly instead of managing a log of state-mutating operations. To that order, RMWPaxos foregoes the log and instead implements a state machine as a single atomic register. The basic protocol implements a write-once distributed register and resembles Paxos – it has two phases to elect a leader and prepare the value. There are some differences too. For example, the leader’s ballot is given by acceptors in phase-1, essentially meaning that a proposer always gets the majority of replies, as long as the majority of acceptors are up. The protocol then makes use of consistent quorums (see image below), and if the entire majority quorum has the same round number, the leader can proceed to phase-2 and play out regular Paxos (recover value if needed, or write a new one if not). However, if the phase-1 quorum is inconsistent, meaning that some acceptors have returned different round numbers, the protocol default back to running a regular Paxos Phase-1 by picking a round number higher than any seen in the quorum. The basic protocol is augmented to allow a register to be rewritable (after all, if all we care about is a write-once register, we can simply use basic synod Paxos). Anther augmentation deals with preventing the same operation to be double-written in the dueling leader scenarios. Here authors make some strong assumptions about the network and expect ordered point-to-point message delivery.

rmwq

 

Discussion

(1) We have spent quite some time discussing the basic write-once protocol, as it seems more complicated than Paxos (and default to Paxos under inconsistent phase-1 quorum). It appears that when there is a need to recover some partly written value, a new proposer does so in 3 phases instead of two: phase-1, phase-1 classical Paxos, and phase-2

(2) On the usefulness of such protocol. The authors claim it is a big overhead to manage the log, but in our discussion, we did not fully buy that. The compelling point is that when the size of an operation is large, it becomes cheaper to manage the register than a log since the log will occupy a lot more space compared to the state machine. In this argument, an entire state machine fits in a register. The authors claim that there is no need to maintain separate learner nodes as each acceptor always has a copy of a register and that the entire RSM can be “learned” with just one read. The paper gives an example of a KV-store that can use different registers for different keys. One issue we discussed was that the keys are managed by totally separate state machines, meaning that having complex operations that touch multiple keys require a distributed transaction protocol.

(3) With regard to related works, our consensus was that there are some similar protocols proposed earlier that are not discussed or compared against. One example is CASPaxos/gryadka. Another one is CPaxos(Bolt-On Global Consistency for the Cloud), and both of these have earlier precursors, such as Disk Paxos and Active Disk Paxos.

(4) More about assumptions in the basic protocol. We discussed the possibility of duplicate messages causing the repeat of phase-2 in the round since it is possible for a proposer to go into phase-2 with an acceptor-assigned round/ballot. In that case, a bizarre duplication of phase-1 acks, may cause an acceptor to also run phase-2 twice. Which is ok if the value at the proposer does not change. This scenario is a bit of a stretch, but it was fun looking for ways to possibly defeat the protocol.

(5) Evaluation shows good results, with RMWPaxos outperforming Multi-Paxos two-fold in some cases. This is impressive, but not unexpected, given that RMWPaxos used multiple instances for different keys vs one instance of Multi-Paxos for all keys. In the discussion, we believe that this is a better illustration of why sharding/partitioning/multi-leader approach helps with performance and not so much the pure benefit of having a register. The being said, RMWPaxos should be very good for such fine-parallelism in tasks that allow it.

rmwperf

Please join the reading group’s slack channel for more discussion, paper information, and presentation times.

One Page Summary: Flease – Lease Coordination without a Lock Server

This paper talks about a decentralized lease management solution. In the past, many lock/lease services have been centralized, placing a single authority to manage all locks in the system. Google’s Chubby, Apache ZooKeeper, etcd, and others rely on a centralized approach and backed by some flavor of a consensus algorithm for fault-tolerance. According to Flease authors, such centralized approach Flease Groupsmay not be ideal at all times and can create a bottlenecks when coordination is required only within small groups of nodes in the system. Distributed filesystems seem to be candidates for this sort of lock management, as a group of nodes tend to be responsible only for resources sharded to that group. Each such group acts independently and maintains locks for resources assigned to it, making a global lock not necessary.

The implementation of decentralized, sharded, lock management system is rather trivial. Since getting a lock/lease requires nodes to reach the consensus about the lease ownership and duration, a Paxos algorithm will suffice. In fact Flease uses a flavor of Paxos built with a distributed register. Right away we can see why Flease targets systems that are sharded into small non-overlapping groups. Running Paxos over too many nodes will degrade the performance.

Just like other lease system, Flease needs to have synchronized clocks with known max time skew uncertainty ε to control lease expiration. It also places some restrictions on minimum lease duration due to the network characteristics, i.e. lease duration has to be greater than two RTTs and greater than ε. Choosing the max lease duration is important for performance reasons.

Flease ThroughputFlease was evaluated against ZooKeeper, as both are implemented in Java and use the same network IO libraries.The figure shows throughput per (client) machine with Flease (straight line) and ZooKeeper. Flease uses small groups of 3 nodes, each running Paxos, and ZooKeeper runs on 3 nodes as well, with all clients connecting to it for lock management. As expected form this experiment, Flease has greater parallelism. When running 30 clients, Flease runs 10 Paxos machines, while ZooKeeper still operates a single one (3 nodes) with 30 clients connected. It is unclear how quickly the performance of Flease will degrade as the group size increases. In essence, similar if not better results could have been achieved by deploying separate ZooKeepers for each group to allow for the same level of parallelism.

The problem of lock management is important and Flease give a good example of application that can benefit from a less-centralized solution to locks. However, the approach in this paper is as trivial as deploying multiple lock management systems for each of the independent non-overlapping groups of nodes in the system. Flease performance will degrade if group size grows above 3 to 5 nodes. In addition, the algorithm is not suitable when group boundaries must be breached on occasion.