Reading Group. How to fight production incidents?: an empirical study on a large-scale cloud service

In the 125th reading group meeting, we looked at the reliability of cloud services. In particular, we read the “How to fight production incidents?: an empirical study on a large-scale cloud service” SoCC’22 paper by Supriyo Ghosh, Manish Shetty, Chetan Bansal, and Suman Nath. This paper looks at 152 severe production incidents in the Microsoft Teams service. The authors looked at these incidents and distilled them into a handful of categories in terms of the root cause, mitigation type, detection, etc. And from placing the incidents into such categories/buckets, some interesting patterns started to emerge regarding the timeliness of incident mitigation, mitigation approaches, and potential areas for improvement. 

I liked that the paper described their data collection methodology since the categorizations may be rather subjective. However, I will mention only one detail — the authors assigned a single root cause to each incident, even though some incidents are complex and may have more than one contributing factor. I also like that the paper cautions readers from making hasty generalizations — the study focuses on just one large service, and many of the findings may be specific to that service.

So, with the above disclaimer in mind, what have we learned from the paper? The paper’s findings can be roughly broken down into a handful of topics: root cause, detection, mitigations, and post-incident lessons learned by the team/engineers. 

On the root cause side of things, different bugs in the service (Teams) only make up roughly 40% of the incidents (27% code bugs + 13% configuration bugs). Other categories include infrastructure failures, deployment failures, and authentication problems. On the infrastructure side of things, the paper separates infrastructure failures into three different categories. The first one, referred to as “infrastructure failures,” deals with scalability problems, like the inability to get enough nodes to run the work. The second infrastructure root cause bucket is “dependency failures.” Finally, the failures of databases & network dependencies get their own root cause category. But if we combine the three together, all infrastructure failures are around 40%. 

On the detection side, the paper suggests a significant deficiency in detection and monitoring systems and practices. Almost half of all incidents had some kind of detection malfunction, with 29% of incidents reported by external users and another 10% reported by internal users! If we look at the automated detection issues, many are due to bugs, lack of automated monitors, or lack of telemetry. 

For incident mitigations, the authors discuss the common types of mitigations and the reasons for some of these mitigations requiring more time to address the problem. While around 40% of issues were due to bugs, only 21% of all mitigations required a bug fix (8% for fixing code and 13% for fixing config). Additionally, 11% of issues relied on some “ad-hoc” fixes, which the authors describe as “hot-fixes.” The paper conjectures that it takes substantial time to go through the process of fixing bugs during mitigation. Instead, a faster way to recover is to initiate a rollback (22% of cases) or perform an “infrastructure change” (another 22% of incidents). By “infrastructure change” the authors mean scaling the system to take on more nodes/CPU. 

As for mitigation delay reasons (the paper calls them mitigation failures, although the mitigations themselves did not fail but instead took longer), the authors describe several common causes. Inadequate documentation and procedures are at the top of the list. Another common one is deployment delay which occurs when it takes a lot of time to deploy the fix. 

With all of the above findings, the lessons learned by the engineers come to the following: need more automation, need more testing and need more changes within the organization (behavioral change, better documentation, better coordination). I want to focus a bit on the automation part, as it hits a bit too close to home for me. Most automation suggestions (26%) boil down to better tests, such as performance tests, chaos engineering, and better end-to-end testing. I am a bit skeptical about these numbers as testing is something we blame all the time when problems occur — it is a reflex response to say that more testing is needed. And of course, we need more testing but mind you, these same people who call for more testing when dealing with issues, write features and (inadequate?) tests when they are not on call. 

What caught my attention is the need for automated deployment. More specifically, “automated deployment” includes “automated failover.” Just the fact that this is a recurring ask from engineers puzzles me a lot. This means that at least a handful of times per year, running Microsoft Teams requires engineers to manually reconfigure the system to remove failed nodes/machines/services and switch the load to new/backup ones. 

The authors discuss several more insights after doing a multidimensional analysis. I am not going to go in-depth here, and instead, just leave snippets of the paper:

Discussion

1) General Applicability of results. As mentioned in the paper, all observations are taken from one service, so your mileage may vary. Another point to note is that Microsoft is a large organization with decently mature monitoring, automation and deployment tools, which may impact the number and severity of observed incidents. It is quite possible that with a less mature set of tools, one may observe more variety of serious problems. 

Another point to mention with regard to applicability is that there are many other large systems/services that are very different from Microsoft Teams. For example, if we look at systems that maintain a lot of state (i.e., databases), we may see other common root causes and mitigation patterns. For instance, a typical mitigation strategy observed in the paper is throwing more resources at the problem. This strategy works great with systems that run a lot of stateless or small-state components/microservices. Adding more resources allows scaling performance bottlenecks as long as underlying databases/storage dependencies can handle the load. Scaling stateful systems, like databases, often requires spending resources upfront to move data/state around — something a system cannot do well when overloaded. 

2) Lack of concrete actionable items. The findings are interesting from an educational standpoint, but they lack clear directions for improvement. The authors give general advice, along the lines of improving testing, building better automation, etc., but there are no concrete steps/procedures that may help improve systems’ reliability. One point made in our discussion is the need to focus on the specific type of problem to find more concrete solutions for that problem. 

3) Teams Outage. On the day we discussed this paper, Teams experienced an outage, likely related to some networking configuration issues.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Amazon DynamoDB: A Scalable, Predictably Performant, and Fully Managed NoSQL Database Service

In the 120th DistSys meeting, we talked about “Amazon DynamoDB: A Scalable, Predictably Performant, and Fully Managed NoSQL Database Service” ATC’22 paper by Mostafa Elhemali, Niall Gallagher, Nicholas Gordon, Joseph Idziorek, Richard Krog, Colin Lazier, Erben Mo, Akhilesh Mritunjai, Somu Perianayagam, Tim Rath, Swami Sivasubramanian, James Christopher Sorenson III, Sroaj Sosothikul, Doug Terry, Akshat Vig.

The paper is loaded with content as it presents many different things, spanning ten years of development. None of the topics are covered in great detail, but I think it is still a great overview of such a massive project. Obviously, the authors discuss DynamoDB, its architecture, and its design. The paper also provides a brief history of the system and examines several challenges/lessons the team has learned while operating such a massive scale system.

To start with the architecture, the users interact with the system by reaching out to the request router. The router can perform the authentication and admission control. Most importantly, however, the router has access to partition metadata, allowing it to, well, route the requests to proper storage nodes and replicas. A node hosts multiple replicas for different partitions.

So, speaking of partitions, each data item in DynamoDB has a unique primary key. These primary keys group items into partitions replicated with Multi-Paxos for redundancy across multiple replicas in different availability zones. The assignment of key ranges to partitions (and partitions to nodes?) constitute the metadata needed for the request router.

DynamoDB has two types of replicas — log and storage replicas. Log replicas only contain replication write-ahead logs. Storage replicas, in addition to having a log, also maintain a state derived from applying/executing the logged commands against the B-tree storage. Both replica types can participate in Paxos quorums, but log replicas are more lightweight and cannot serve reads/queries. The system uses log replicas to improve availability and reliability — it is easier to spin up a log replica that only needs to accept new commands than to rebuild a full storage replica with all the partition’s data. This speed becomes important under failures to restore the system to the proper degree of replication quickly.

From the historical perspective, while DynamoDB started as a pretty basic NoSQL (key-value) store, it has added many features over time, such as secondary indexes, JSON documents, encryption, transactions, and more.

Finally, a decent chunk of the paper focuses on various nuances of running large-scale NoSQL data stores. For example, the paper notes data errors and how DynamoDB verifies the data integrity with checksums for every data transfer between nodes. DynamoDB also does background data verification at rest. Another important lesson on the reliability side of things is the need to maintain enough capacity in the metadata system. While the request routers use caches for metadata to improve performance, a metastable failure in the caching system led to a rather big outage. After the fact, the caches are used only to improve the latency, and no longer offload capacity from the main metadata storage service — all requests for metadata go through to the service even if they are answered by the cache first. This ensures having adequate capacity to serve critical metadata operations regardless of the cache failures. Naturally, this is a more expensive solution for the sake of reliability.

The authors discuss plenty of other lessons and challenges, such as managing the load and balancing capacity of the system and implementing backups and improving availability.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. The Case for Distributed Shared-Memory Databases with RDMA-Enabled Memory Disaggregation

In the 122nd reading group meeting, we read “The Case for Distributed Shared-Memory Databases with RDMA-Enabled Memory Disaggregation” paper by Ruihong Wang, Jianguo Wang, Stratos Idreos, M. Tamer Özsu, Walid G. Aref. This paper looks at the trend of resource disaggregation in the cloud and asks whether distributed shared memory databases (DSM-DBs) can benefit from memory disaggregation (MD) to become the next “hot” thing in the database world. 

The idea, on the surface, is simple — decoupling compute from memory enables the creation of databases with many separate stateless compute workers for query processing that share a remote memory pool. According to the authors, the driving force for this is RDMA, as it allows relatively comparable latency and bandwidth to local memory. On top of that, such a system can also use disaggregated storage for durability. The bulk of the paper then focuses on challenges for such disaggregated design without going in-depth into the database architecture itself. The paper also does not go deeply into the design of the disaggregated memory system, although it points to a few issues to solve. 

The first challenge listed by the authors is the lack of appropriate APIs to access memory. In particular, the paper suggests having APIs that are more in line with how memory is managed locally — memory allocation APIs. In this disaggregated memory pool case, the memory allocation must work with the virtual address space of the memory system. The authors also suggest data transmission APIs facilitate moving data for local caching at compute nodes. Finally, function offloading API can move some simple compute to the memory systems (does not this defeat the purpose of memory and compute disaggregation?)

The second important set of challenges deals with the disaggregated memory system itself. How does such a system handle node failures and still remain available and durable? Sadly, the paper does not provide any concrete details aside from hinting at high-level solutions, all of which will cost the performance — backup on storage, erasure coding, replication, etc.

The third block of challenges has to do with concurrency control. If the database system caches data locally at worker/compute nodes, then we need to worry about cache coherence when multiple workers can access the same memory. Here we see that memory disaggregation is still slow — local cache can be an order of magnitude faster. This is a smaller difference than, let’s say, going from SSD to memory, but it is still substantial. Authors suggest that reduced speed differences in this new memory hierarchy will require new caching protocols, prioritizing execution time and not cache hit rate.

Another concurrency challenge has to do with transactions, as now we have the potential to fit all data in one large memory pool with many workers accessing it concurrently. Again, the paper does not have many concrete solutions but suggests “rethinking distribute commit.” Similar is the “solution” for concurrency control. It is costly to implement locks over RDMA, so we need to rethink CC as well, preferably without locks. Lastly, this all needs to work with thousands of compute nodes.

The last set of challenges is indexing-related. This, again, can get into the tricky RDMA performance limitations, so we need to have an RDMA-conscious index design. Also, the index needs to work well under high concurrency. 

Discussion

1) Details. Our group collectively found the paper to be rather shallow on details of how these systems may work. While the paper examines some literature on the shared memory databases of the past, it lacks depth and connections with this new paradigm of disaggregated memory used over RDMA. We are especially curious about more depth for concurrency issues and solutions, as many stated issues may have been solved in prior shared memory databases, albeit at a smaller scale.

One example where the paper is very shallow is disaggregated memory system itself. Stating there are challenges with availability and durability in a core component for all DSM-DBs is not going to cut it — the entire premise of the paper depends on such a disaggregated memory system to be fast and reliable. Without these basics, the rest of the discussion becomes largely irrelevant.

2) Memory Disaggregation. We discussed the memory disaggregation idea in general and whether it can become a mainstream technology. See, storage disaggregation is kind of ubiquitous — you create a VM in some cloud, be it AWS or Azure or GCP, and the storage this VM gets is likely to be in a different box (or rather a set of boxes) than your VM’s CPU or memory (think of EBS volumes on AWS EC2). We are ok with this, as this storage is plenty fast and behaves just as if it was located in the same servers as the rest of the virtual hardware. This whole memory disaggregation with RDAM does not work this way, creating a lot of challenges. Most importantly, this disaggregated memory cannot be made (yet?) as universal as disaggregated storage. We won’t run code from it or use it for anything that needs to copy/change contents a lot. As a result, this disaggregated memory, at best, will look like another “storage” solution to systems that use it — something that may be faster than durable storage, but still not fast enough for general use.

Personally, I see more utility in the future with memory disaggregation using CXL. There was a paper at the recent USENIX ATC on the topic. Such a solution may act more like additional memory on-demand that a shared pool of memory between processors/nodes, but it will also not have issues with cache coherence, difficulty, and limitation of RDMA and RDMA’s reliability. I can envision a top-of-rack memory pool, that tenants can tap into if they need more memory or if we need to have a cloud VM product that can scale memory and CPU cores independently of each other. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Not that Simple: Email Delivery in the 21st Century

I haven’t been posting new reading group paper summaries lately, but I intend to fix that gap and resume writing these. Our 123rd paper was about email: “Not that Simple: Email Delivery in the 21st Century” by Florian Holzbauer, Johanna Ullrich, Martina Lindorfer, and Tobias Fiebig. This paper studies whether different emerging standards and technologies impact email delivery from one Mail Transfer Agent (MTA) to another.

It turns out that with a slew of new and changing standards and technologies, email delivery is not that easy. If we simplify things a bit, the sender MTA first needs to figure out where to send the email using DNS. Then it communicates with the receiving MTA to actually deliver the message. Things can go wrong at both of these steps due to the usage of different communication standards (IPv4 vs. IPv6) or adherence to stricter security. On the security side, the authors tried to force TLS to ensure an encrypted connection and present an invalid TLS certificate to study how vulnerable the email exchange may be to several types of attacks, such as man-in-the-middle. They also tried to use misconfigured DNSSEC to see if that prevented email delivery.

To quantify the email delivery, the authors set up their email testbed, consisting of differently configured Authoritative DNS servers and MTA, giving a total of 11 combinations that try different connectivity and security options. This testbed served as the receiver. For senders, the authors used email providers, categorized into two groups: large and regular. Large providers are companies like Google and Microsoft that provide email services for a variety of other organizations and individuals. Regular providers are smaller and serve one or a few institutions. 

Of the big findings, IPv6 is still a problem. Roughly 40% of email providers (regular and large) cannot deliver mail to systems with IPv6-only DNS. About 60% of providers cannot deliver mail when both MTA and DNS only rely on IPv6. 

Another important lesson is that email providers, large and regular, prioritize email delivery over security. While most providers (90% of regular and all large) can deliver emails with forced TLS, the providers do not care/check the validity of the TLS. Misconfigured DNSSEC also does not seem to stop most providers from delivering mail. 

Discussion

1) Missing parts. The paper studies email delivery, but it looks at the problem very literally — whether email arrives at the receiving MTA. This does not necessarily correspond with the user experience. For example, the complexity of dealing with spam is not studied here, but it is a large part of users’ experience with email. The receiver may apply spam filters after receiving the email, which can cause the email to be never shown to the user.

2) Graylisting. The paper does some study on spam emails, again in the context of its delivery to the receiving MTAs and the technology used. One spam-specific technology the paper tested is graylisting. This is a trivial method to reduce spam by rejecting emails from unknown sources on the first attempt. If the sender retries the email delivery sometime later, then the receiver will accept it. It appears that this simple technique resulted in fewer spam emails. However, we wonder if using it even makes sense — 60% of spam still made it through, and using state-of-the-art spam filters may end up catching more spam regardless of whether it made it through the graylisting procedure. 

3) Misleading results. The results shown in the paper express the ratio of different providers that may be impacted by a particular issue/configuration. This is a somewhat misleading metric, and it would be much nicer to know how many users may be impacted instead. Of course, for all the fairness to the paper, this information is rather hard to know since email providers are not likely to share their user data. 

4) IPv6. The results on issues with IPv6 may not be specific to email, as IPv6 adoption, in general, may not be so great just yet. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Winter Term Reading Group Papers: ##121-130

Our winter set of papers! The schedule is also in our Google Calendar.

Fall Term Reading Group Papers: ##111-120

Below is a list of papers for the fall term of the distributed systems reading group. The list is also on the reading group’s Google Calendar.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Special Session: Scalability and Fault Tolerance in YDB

YDB is an open-source Distributed SQL Database. YDB is used as an OLTP Database for mission-critical user-facing applications. It provides strong consistency and serializable transaction isolation for the end user. One of the main characteristics of YDB is scalability to very large clusters together with multitenancy, i.e. ability to provide an isolated user environment for users in a single cluster. In this talk, we will cover two layers of YDB – Tablet and BlobStorage layers that together provide fault tolerance, scalability, and user isolation.

Tablet is a very lightweight component that implements distributed consensus. It is a building block for user data storage, schema manager, tablet manager, system views, and many other components. A single YDB cluster is known to have more than a million tablets, a single YDB node can serve up to 10K tablets. Tablet implements distributed consensus protocol over a shared log provided by the BlobStorage layer.

The BlobStorage layer is a fault-tolerant key-value append-only database. It implements a specific API for logging, blobs reading and writing, garbage collection, and quorum management. We believe that the protocol between Tablet and BlobStorage is quite simple and understandable by listeners compared to Paxos or Raft. We also illustrate the flexibility of YDB in real-life use cases like migrating some parts of the cluster from one availability zone to another without a maintenance window or storing some data blobs on fast media like SSD and another blob on HDD.

Speaker: Andrey is the head of the YDB project (https://ydb.tech). He works in Yandex as a Head of Department. He also has a long story in Yandex as a member of the Web Crawler Team, a member of the Infrastructure team, one of the creators of Yandex Cloud, and now the part of CloudIL Team. Andrey holds a Ph.D. from Moscow State University.

When: Wednesday, August 10th at 2:00 pm EST (6 pm UTC)

Where: Zoom

Metastable Failures in the Wild

Metastable failures in distributed systems are failures that “feed” and strengthen their own “failed” condition. The main characteristic of a metastable failure is a positive feedback loop that keeps the system in a degraded/failed state. These failures are hard to spot, as they always start with some other distraction — some trigger event that nudges the system over its operating limit or capacity. However, fixing the trigger is not enough, and engineers realize this too late — so late that sometimes it takes days to stabilize the system and return to the pre-failure level of operation.

I already blogged about this topic last year in the context of our original metastable failures paper from HotOS’21. Now we have the “improved and extended” version out at OSDI’22, and oh boy, we have a lot of juicy stuff there. That failure that took days to recover? I did not make it up. My students spent quite some time going through hundreds, if not thousands, publicly available post-mortem reports for different systems to identify a handful of impactful metastable failures. We are sure there are more metastable failures out there lurking in these bug and failure reports. In fact, we had significantly more candidate failures, but many post-mortems simply do not have enough information to be absolutely sure metastability played a role.

Anyway, above is a table from the paper that lists examples of metastable failures in the wild. Some last for an hour, and some take days to resolve. Many examples are due to retries and retry-induced load amplification. See, when a system becomes slow (i.e., some server failure or transient network issue), it is likely that latency on some requests will start to grow, and it may reach the point at which the client application gives up the request and retries. Retries are great for dealing with transient issues, but they create more load for the system by potentially repeating the same request over again. When the system is already overloaded due to the ongoing issue, retries do not solve any real problem and create more unnecessary load. More load means higher queuing times, higher latency, and more retries. This vicious cycle can continue even after the original trigger issues are fixed, and, in many cases, there is no escape from this except load shedding.

We managed to reproduce a toy example of retry-induced failure using MongoDB. It is a bit concerning that we were able to do so at a small scale, but our experience also showed that not all hope is lost. See, as it turns out, modern systems are good at load shedding — when the request queues up for too long and is too old by the time server can handle it, such an old request can be dropped without wasting further resources. Our experiment here used a very aggressive retry policy that had to overrun the load-shedding mechanisms of the underlying database. That being said, sometimes the difference between a failure and recovery can be down to minuscule difference in the trigger. The figure here shows a failure when a trigger took out 80% of CPU capacity, and a successful trigger recovery when the trigger reduced CPU capacity by 78%.

Now, of course, retries are not the only cause of the self-feeding failure loops, but retries are something most engineers are familiar with, so the post-mortem reports tend to have more details on that topic. Potentially, this is one reason for observing so many retry-induced metastable failures in the wild. However, we saw other feedback loops. For example, one of the failures in the list was caused by excessive logging on the error path, which in turn caused the system to slow down and have more requests enter that same error path.

Seeing the examples in the real world and reproducing them is important. Our HotOS paper claimed that reproducibility at a small scale is often challenging. In this new paper, we have a handful of metastable failure scenarios reproduced at a scale of at most a few VMs. However, a good chunk of the paper is about generalizing what we have learned from the observations. We developed a “reasoning framework” for metastability and metastable failures.

The two important characteristics of metastable failures are triggers and amplification effects. On the trigger side, we can have a load-spiking trigger and capacity-decreasing triggers. The former trigger type temporarily increases the load on the system above some pre-trigger level. An example of this can be higher than expected demand from users. The later trigger type degrades the systems’ serving capacity. An example of this trigger is a server failure that takes some capacity out.

Similarly to triggers, there are two types of amplification effects: workload amplification and capacity degradation amplification. Both feedback loops kick in after a system enters an overloaded state — having an insufficient serving capacity for the current offered load. The workload amplification seems to be the more common type in the examples we found so far. It creates even more load in response to overloaded. The vicious retry loop from earlier is an example of workload amplification. The other type, capacity degrading amplification, reduces the serving capacity of the system in response to an overload. This can happen, for instance, in cached systems and applications, when losing some cache can result in downstream system overload that either prevents cache rebuild or downright causes more cache to expire and go stale.

The figure above shows how these two types of triggers and two types of amplification mechanisms can combine. Also, an important point to note — it is possible to have a combination of more than one trigger and more than one amplification mechanism, something that we have seen in the wild. As far as experiencing metastable failure, a lot depends on how the triggers and amplification mechanisms interact and how much load was offered to the system before the failure sequence began.

Small triggers that do not last long or do not have a big enough magnitude (i.e., a trigger that does not overload the system too much) may be less likely to cause an issue. On the other hand, a trigger that lasts an hour and takes out half of the servers is a much bigger problem. Of course, the magnitude of the trigger is not an absolute metric — your system may be fine even when you take out half the servers if it was operating at 25% of its maximum capacity before the trigger event took place.

Triggers are not metastable failures. They are failures on their own that can cause availability and performance issues for clients or users, but they are only the beginning of the metastable failure sequence. The amplification mechanisms are those feedback loops that start after the system is overloaded due to a trigger. Amplification leads to a metastable failure after the system’s overload is amplified so high that even fixing the trigger (i.e., returning lost capacity, or solving the load spike) does not solve the problem. The figure marks those metastable failure points. Naturally, the type of amplification matters a lot. A system with “slow” amplification loops that add little overload over time is better than systems with “fast” amplification loops. In both cases, we can have a metastable failure, but a slow amplification mechanism gives us more time to deal with the trigger.

Anyway, what are the lessons here? For one, I cannot stress the importance of understanding the mechanisms involved so that we can operate the systems better. Maybe, it is not a good idea to squeeze every single bit of capacity from the system/hardware? Having some slack resources allows us to absorb smaller triggers and gives time before the system enters the metastable failure for bigger triggers. The same goes for making systems with predictable performance characteristics. If some code path is significantly slower than the other, a sudden switch to a slower code path can act as a trigger, since it is essentially a capacity degradation.

Load shedding is the solution to metastable failures. All recovery efforts included some load shedding mechanisms. After all, to bring the system back into operation, its load needs to decrease sufficiently as to stop the amplification. But we do not need to leave load shedding only for recovery efforts after it is already too late. More aggressive load shedding done ahead of time or early after the trigger overloaded the system can save from a bigger failure — it is better to lose some requests and piss some clients/users than lose the entire system and impact everyone. Here I can also lump load prioritization — prioritize the work that will raise the system’s goodput if possible.

Another point is to account for the human factor — about half of the observed failures in the wild had some direct human involvement, such as rolling out bad configuration or buggy code. It is important to test and deploy configuration the same way as software. This is something that many systems still seem to neglect. Proper testing of new code is paramount, including stress testing. And, of course, do not deploy on Friday… we have seen a failure in which a weekend load/traffic reduction masked increased resource usage of a new version deployed on Friday. The system failed after the load returned on Monday.

Another interesting observation is the “fix-to-break” behaviors. On a few occasions, engineers increased the severity of the amplification mechanism (feedback loop) after fixing an outage, leading to a more severe failure in the future. I refer to this as a “fix-to-break” pattern; it is a good illustration of why we need to understand metastability in order to avoid causing failures.

As always, the paper has way more details. We talk a lot about definitions and formalism of metastable states and failures. We have a handful of distinct reproductions with triggers and amplifications caused by retries, cache failures, and garbage collections. And finally, we include some real-world first-hand details on metastable failures at a large internet company.

Reading Group. Darwin: Scale-In Stream Processing

In the 99th reading group meeting, we discussed stream processing. The paper we read, “Darwin: Scale-In Stream Processing” by Lawrence Benson and Tilmann Rabl, argues that many stream processing systems are relatively inefficient in utilizing the hardware. These inefficiencies stem from the need to ingest large volumes of data to the requirement of durably storing ingested data for fault tolerance to the generality of the stream-processing frameworks themselves. Due to these inefficiencies, many stream processing systems scale by adding more machines instead of solving the existing bottlenecks. The authors propose a set of methods to address the scalability concerns. The paper refers to these fixes as a “scale-in” approach, where the system tries to optimize and fully use the resource it already has before requiring more servers. 

Overall, the paper identifies several areas of concern and improvement. On the framework generality side of things, the authors discuss query compilation. The idea here is that compiling streaming queries into native, optimized code will be faster and more efficient than having these queries run in a more general but interpreted environment. Out of all identified problems, Darwin seems to only solve this one. Darwin exposes an SQL-like interface to users, creates a query plan that incorporates various optimizations, translates the query plan into C++ code, and finally compiles C++. 

Other bottlenecks/issues the paper identifies are network communication, persistent storage, and recovery. For network communication bottlenecks, the authors propose using… well faster networks and RDMA. On the persistent storage side of things, the paper claims we need more speed and should consider newer technologies, like nonvolatile memory (PMem). Finally, for the recovery aspect, the paper is again not very creative and suggests using “modern storage technology to achieve efficient checkpointing.”

Discussion

1) Limited Implementation & Eval. This is a short paper, so there are only so many details on many of the discussed topics. The paper identifies 4 bottlenecks and provides experimental justifications for these. However, on 3 out of the 4 issues, the solution seems to boil down to “using modern technology.” So, it seems like either 3/4 of the problems have been solved with “modern technology” or that the authors have no good solution to them. 

The limited evaluation is also interesting. After all, in RAM, Darwin shows the same performance as the other systems the authors compare against. This makes us question whether the remaining 1 problem and solution (query optimization) is also actually a problem in (at least some) other streaming systems.

2) Few Interesting Numbers. We really liked the motivation for scaling streaming systems the paper uses — Alibaba’s 2020 Singles Day (a big e-commerce sales event) produces 4 billion streamed events per second are required 1.5 million cores of Apache Flink to handle. At such a scale, improving the efficiency of streaming systems is very crucial. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Achieving High Throughput and Elasticity in a Larger-than-Memory Store

Achieving High Throughput and Elasticity in a Larger-than-Memory Store” paper by Chinmay Kulkarni, Badrish Chandramouli, and Ryan Stutsman discusses elastic, scalable distributed storage. The paper proposes Shadowfax, an extension to the FASTER single-node KV-store. The particular use case targeted by Shadowfax is the ingestion of large volumes of (streaming) data. The system does not appear to have replication and cannot handle server failures without data loss. However, the authors focus on reconfiguration performance and its impact on the rest of the system. So, in case of failures, replacement servers can be added with a minimal performance penalty. Under failure-free conditions, nodes can be added or removed without data loss. Finally, Shadowfax stores data in three different “places” based on how hot the keys are. Recently updated keys reside in RAM, colder data gets pushed to the SSD, and finally, the data from SSD is asynchronously copied to the shared cloud storage.

The main problem Shadowfax solves is ingesting lots of data from networked clients. A single server can only do so much, so we really need a system that can partition the keyspace across the servers. Shadowfax builds on top of FASTER, single-node storage that leverages a lock-free hash table to support key indexing/look-up. Shadowfax adds hash-based key partitioning to ensure each server is responsible for a subset of all keys. This partitioning is stored in ZooKeeper for fault tolerance of the sharding configuration. 

A lot of the Shadowfax design stems from two ideas: (1) avoiding coordination and context switching and (2) avoiding unnecessarily costly operations.

For the first point, Shadowfax servers pin processing threads to cores, allowing each thread to pick up some requests and dispatch them to the underlying FASTER store shared among all processing threads. As such, the processing threads never coordinate with each other by any other means except the FASTER store. The shared FASTER store uses lock-free hashing to minimize the impacts of concurrency. Whenever some coordination is needed, FASTER avoids explicit cross-thread coordination with the help of asynchronous cuts. As different threads may access the same shared data, it is often essential to know when all threads have passed or seen some particular version of the data. For instance, when some record moves from RAM to SSD, all threads must have completed their access to that record in RAM (or agreed not to access it) before the memory can be cleaned up. FASTER achieves this lazily via view or epoch changes. When a thread sees a new epoch and transition to it, it agrees to have finished accessing any data in the old epoch. When all threads have moved (at their own pace) to a new epoch, the system can reclaim the memory supporting the older epoch’s data.

On the client-side, threads pin to a core as well. Clients maintain sessions with the servers. Each client thread opens a session with each server, allowing independent client threads to talk to the same server without any internal coordination. When accessing the key, a client thread uses its session for the partition/server hosting the key and batches the request before sending it over to the server with other requests from the same session.

The scheme with sticky sessions works well when the partitions are static. However, in a dynamic situation, it may be possible that a client sends a request to a server that no longer serves a key. This is where the second aspect of the design comes in. The naive solution would be for a server to check ownership for each request, but this would also be a waste of CPU cycles for the vast majority of requests. Instead, Shadowfax checks request ownership at a batch level using view numbers and drops outdated batches. Each server has a view number representing its configuration version. Whenever there is a partitioning change impacting a server, its view increases. The client attaches the view obtained (and cached) from ZooKeeper to a batch, so if the client’s information is stale, its batches will not succeed. Naturally, when this happens, the client refreshes the keyspace mapping and server views from ZooKeeper

The migration tries to avoid as much coordination as possible. It leverages the idea of the asynchronous cut to transition between phases of the migration process — once all threads finish a migration phase, they can transition to the next phase. I will skip the detailed explanation of data migration, however, I will mention that it requires a few state transitions on both the source and target nodes, and during the process, some requests may get delayed, as the target machine may not have yet received the data for remapped/repartitioned keys. In short, the source must find the keys to send, tell the target to expect the data migration, and move the keys, while the target needs to accept the keys and eventually start serving them. 

Another extension of Shadowfax over the FASTER store is the ability to move data from SSD to shared cloud storage. The main benefit of using cloud storage is the speed of reconfiguration and migration. Shadowfax eventually writes all records from SSD to shared cloud storage. This cloud copy allows the migration procedure to avoid copying the records from SSD and instead migrate the index with pointers to the records in the shared cloud storage.

Regarding performance, Shadowfax seems to provide around 130 million ops/sec on one 64-core server. On 12 servers (and 3 clients) as shown in the figure here, the throughput can go as high as 930M ops/sec. Scale-up procedures also seem to be quick with little impact on performance. We can see a dip in source throughput, but it is relatively short and lasts way under a minute in all tested scenarios.

Discussion

1) Fault tolerance? While it is impressive to get up to 130M ops/sec of throughput over TCP (albeit with FPGA network acceleration), I cannot stop wondering what kind of applications need such per-node throughput and do not need data redundancy? If we operate at such speeds, a server reboot can mean millions of lost operations that existed only in RAM. 

2) Client Scalability. Shadowfax clients communicate with servers over TCP, but the system assumes a small number of clients/sessions. In fact, the authors perform most evaluations with just one massive client. What happens is that a client needs a session to each server from each of its threads. So a 64-core client will have 64 sessions per server. I am not entirely sure if having this many sessions will impact the performance. The paper achieves 930M ops/sec in 12 server clusters with 3 clients, but each client is massive, and all three clients combined have 192 sessions on each server. The authors claim this result as proof that the servers can handle many client sessions. 

One question I have is whether having more smaller clients will impact the performance even if it is the same number of sessions? Referring to the theme of the first discussion point, what kind of application will have a single client producing 310 (930/3) million events/records/updates each second? Of course, an argument can be made that these clients are “proxies” for many real clients, like sensors, IoT devices, etc., but that will also mean that these proxy clients will not be able to provide 310M ops/sec, as they need to manage a lot of connections from actual data producers. So this, in turn, will require more proxy clients, hence more sessions. 

3) Benefits of cloud storage? Shadowfax uses shared cloud storage to copy data from SSD. However, this cloud storage does not seem to bring many benefits to the system, except for the data migration and recovery use case. Cloud storage also provides some limited protection from data loss when a server crashes and never reboots. But this protection only covers some data since “hot” keys in RAM are not written to SSD and consequently not copied to the cloud, leading to the cloud alone having incomplete or stale data. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!