How to Read Computer Science (Systems) Papers using Shampoo Algorithm

I think most academics had to answer a question on how to approach papers. It is the beginning of the semester and a new academic year, and I have heard this question quite a lot in the past two weeks. Interestingly enough, I believe that almost every academic active on the Internet has written about paper reading. So one may think there should be plenty of guidance out there, all within Google’s reach. And I see students reading these tips and suggestions and still coming back for more advice.

One problem with many paper reading tips is that they are too algorithmic and do not help where people really struggle. “Read the title, then abstract, then go to the conclusion, look at the figures…” is a common advice pattern. Since when do these steps actually help understand the paper? Who can look at protocol drawings or results figures after reading an abstract and get it? No doubt, this approach works for some, but it seems to require a decent paper reading skill and huge baggage of consumed literature in related topics. Other suggestions I see online may be a lot more helpful but too come with some bias towards more experienced readers.

My approach to reading papers and advice I give my students is based on the infamous shampoo algorithm — lather, rinse and repeat. Scientific (computer science) papers are not like most other types of reading we do. Unlike most fiction, for example, scientific papers are non-linear in their comprehension. Let me explain what I mean here. When we read fictional stories, we follow a linear path through the plot. On the first pages of a detective story, we will never see who was the murderer and how the victim was killed, only to see the motive in the next chapter and a detailed description of a weapon somewhere in the middle of the book. Yet this is exactly how computer science papers unfold their story.

This non-linearity means that we cannot apply linear reading strategies to papers. We cannot just go with the text to build a complete understanding of the story. Most academic papers have circular “plot” structures with multiple rings of comprehension built around each other. Each of these “rings” acts as a foundation for what is to come next. We can think of them as the same concepts explained in increasing levels of difficulty. This rings abstraction is where the shampoo algorithm comes in — we must “rinse and repeat” until we reach a good understanding of something at one level of difficulty before moving to the next one.

Naturally, we can start with a title and an abstract. These build our initial comprehension ring of the paper, as we establish the broad topic and very high-level information about the solution and the outcome. Hopefully, things are clear at this point in the reading process, but if they are not, it may be a good idea to pick up a textbook and resolve the confusion.

The introduction usually brings us to the next comprehension ring. We learn the problem and its importance in greater detail, get motivated. The intro will likely provide some insight into the solution as well. At this time, we may also look at the conclusion to see if anything is interesting there. Some conclusions can be very good at providing a high-level overview, some are not so much. We are at a crucial point in understanding the paper now, and things tend to get way harder very quickly after this. If there is some fundamental gap in comprehension by this time, it would be a good idea to go back and reread. Maybe follow the references on aspects that are still hard to grasp — they will only get more difficult later on.

I should take a pause here for a bit. See, it is ok to not know something at each comprehension ring before moving forward. It is ok to ask questions about how things are done and seek answers in the following outer rings. Moreover, you must ask how and why questions and seek answers later. But you probably should not have fundamental questions about what the authors are doing or questions about the background discussed. I think there is a rather fine line between the questions you need to be asking going forward and the questions that require the “rinse and repeat” treatment. Seeing this line may come with a bit of experience. The good things about the shampoo algorithm is that when you are confused, you can always go back and repeat.

Now we are ready to move forward. If the paper has a background section and you had questions about background earlier, then read it. In my areas of expertise, I may often skim through the background instead of reading carefully.

We are now diving into the “meat of the paper” or “the solution” sections. This is the most challenging and time-consuming part that may actually have multiple comprehension rings packed together. It is important to never be afraid to practice the “shampoo algorithm” when things start to become gibberish. Go back even if this requires rewinding to the previous comprehension ring.

At first, we may approach the solution sections by reading the English description accompanied by any graphical explanations in the figures. At this stage, it may be a good idea to skip any algorithms and proofs. Instead, we must focus on absorbing the details of the solution and try to answer the how/why questions we had previously. The detailed English and graphical descriptions form an entire comprehension ring after the introduction/background one. Working on this third comprehension ring often requires going back and rereading paragraphs and sections many times over to build proper understanding. Now is also a good time to start looking at the evaluation and see if the results match any expectations we may have had based on our understanding so far.

Finally, we can move into the algorithms and proofs and try to work them out. Working through these things requires a good understanding of what is going on in the solution. We have worked hard to have this understanding by reading and rereading, asking and answering questions. Building the required understanding/intuition of the problem and solution is the reason for approaching these so late in the process. However, going through algorithms on paper line by line is super useful and helps transform intuition into a more codified working model. This codified model forms yet another comprehension rings. An additional ring will form after going through the proofs.

While I talked a lot about rinse and repeat, not being afraid to go back into the paper, and gradually build the comprehension rings, I did not mention one super useful tool for reading papers. This tool is writing. See, our brains are kind of too fast when we read, and this speed is not a good one. The brain races between questions, answers, remembering past information, and acquiring the new one. The “go back and reread” approach allows the brain to slow down a bit from getting carried away too far and too fast. However, the ultimate brain speed bump is writing — we type much slower and have to focus on what we write, forcing the brain to slow down even more. And this is where the magic happens. It turns out that when the brain is working slower, the thought process is also more structured. For this reason, pilots are trained to slow their brains down in critical conditions to think better. We are not in any life-or-death critical situation, so simple writing may be a good enough tool to slow your brain down, so you can have time to think and explain what you just have learned.

I often use writing as my ultimate reading tool. For example, all of my reading group summaries are a result of the “ring-3” comprehension discovery process. I do not write for all papers I read, but when I do, it significantly improves my understanding of the paper.

Reading Group. DistAI: Data-Driven Automated Invariant Learning for Distributed Protocols

In the 71st DistSys reading group meeting, we have discussed “DistAI: Data-Driven Automated Invariant Learning for Distributed Protocols” OSDI’21 paper. Despite the misleading title, this paper has nothing to do with AI or Machine Learning. Instead, it focuses on the automated search for invariants in distributed algorithms. I will be brief and a bit hand-wavy in this summary, mainly because I am too busy with other things to take care of (thanks, the beginning of the semester!), and this paper requires rather careful attention and a deep read to fully grasp it.

Before going into the paper itself, I want to talk about invariants. We use invariants a lot when thinking about the safety of algorithms and protocols. So an invariant is a property that must hold throughout the execution of an algorithm. It holds initially and after every discrete step of the protocols. For people like me, finding a good safety property is usually enough when checking some algorithm written in a spec language like TLA+. A TLC model checker runs an exhaustive search of the state space and ensures the safety property after every action. If the invariant condition is violated, TLC will produce a counterexample. There is a bit of a problem with this approach — for big algorithms, checking the model can take hours, days, or even weeks. TLA+/TLC is a bruteforce solution.

IVy, the system used as a component in today’s paper, takes a different approach. Instead of brute-forcing and checking all possible executions (and of course, the execution may be infinite too!), it tries to be clever and prove an algorithm correct by induction. The idea is simple on the surface — take an inductive invariant I prescribing the safety of an algorithm, and show that I holds initially and after all possible transitions starting from any state that satisfied the invariant I. In other words, we need to check that every possible transition from state S satisfying invariant results in a state S’ that still satisfies I. Unfortunately, in practice, this is where things get complicated. The problem is that our safety property I from the model-checking world may not be inductive. More specifically, if we are in some non-initial state S that satisfies I, there is still a possibility that after at action S->S’S’ will not satisfy I. This situation may arise not because the safety property is incorrect but because the state is an impossible or unreachable state that still somehow satisfies I. The solution is then to strengthen the invariant to some new invariant I’ that prevents the unreachable starting state for our possible transitions. IVy system provides tools to do this strengthening semi-automatically with some user help.

The DistAI paper builds on IVy and improves and optimizes the invariant search process to make it faster, more automatic, and more accurate. The process starts similarly to IVy’s, as a user provides the IVy spec with all possible actions, safety properties that must hold, and relations between variables. These relations between variables are important since they specify the properties of the algorithm. For example, a system may have a relation specifying whether a node holds a lock or whether the two nodes have communicated with each other. The system will use these relations to find the inductive invariants. The inductive invariant will be used in conjunction with safety property to finally check the protocol.

The rest of the DistAI’s invariant search process, however, is largely automatic. The process is iterative, with each iteration starting with some input to produce invariant candidates that involve up to some number of variables and no longer than some number of disjunctive terms. For instance, if all variable relations involve up to 2 variables, it may make sense to, initially, restrict the invariant search to just two variables.

Interestingly, at this step, DistAI brings back some of the model-checking magic into the process. It produces a few sample trace executions of the protocol and uses these traces to create a list of all observed variable relations after taking each action. In practice, this list of relations can be large if the simulation trace has many variables, so the system will randomly narrow down its focus to only look at a subsample of the execution trace with just a few variables at a time. For example, if all relations have no more than two variables, we may look at how some combination of two variables changed during the execution trace. This two-variable sample will follow a template with two quantified variables: $$\{\forall V_1, V_2\}$$. The DistAI system produces a handful of these subsamples for each quantified template.

After having these subsamples/sub-traces ready, DistAI enumerates the relations from subsamples. It can easily see which relations were holding throughout the entire execution trace. These relations can be generalized from having specific variable values in the samples to quantified variable placeholders from the sample’s template. For example, if a relation some_property holds in all subsamples, then we can generalize based on the template to $$\forall V_1, V_2: some\_property(V_1, V_2)$$. More than one relation may hold true in the subsamples, and so DistAI will produce multiple of these generalized relations. For example, some other relation $$some\_other\_property(V_1)$$ may have been true throughout all the subsamples. We will use all these relations to produce all different disjunctive invariant candidates, such as
(1): $$\forall V_1, V_2: some\_property(V_1, V_2)$$,
(2): $$\forall V_1, V_2: some\_property(V_1, V_2) \lor some\_other\_property(V_1)$$
and other permutations. Also, note that candidate (1) implies candidate (2), i.e., whenever (1) is true, (2) is also always true. In reality, the DistAI candidate enumeration process is a bit more complicated, as different templates may result in duplicate/overlapped invariant candidates.

At this point, DistAI will use IVy to check the candidate invariants. So since we have obtained the candidates from an execution trace, we already know that the candidates hold in at least some executions, and so we hope that we have had filtered most of the incorrect invariants. DistAI will start the check with the shortest candidate first. In the hypothetical example, candidate (1) will be checked first, since if it is inductive, then there is no need to check candidate (2) that is always true when (1) is satisfied. If the shorter invariant candidate fails IVy check, then DistAI will try to refine it by iterating to a longer version or a version with more variables. In this case, candidate (2) is a refined candidate that has one extra disjunctive term. If DistAI cannot find a successful candidate in the list created during the enumeration process, then it can start from the beginning by running new sample traces with more variables and increased the maximum length of the disjunctive invariant formula.

Phew, this is all very complicated. In short, the idea is to get some invariant candidates from the simulation trace and then check these candidates in IVy, starting with the shortest/simplest candidate and gradually working the way up to more complex invariants. Each of the more complicated candidates you check adds a bit of disjunctive stuff to exclude the issues the previous less-refined candidate stumbled upon.

As always, we had a video:

Discussion.

Quite frankly, I no longer remember the full extent of the discussion we had. I remember it was quite a lot of talking.

1) Inductive invariants. We spent a bit of time on the difference between inductive invariants and just an invariant. See, we have TLA people in the group, but not that many experts in formal methods. As I mentioned in the beginning, TLA is happy with a short non-inductive safety invariant because it is a brute-force tool that can only check finite protocols.

2) Candidate refinement. It seems like most of the refinement is based on going through the list of candidates from the enumeration step and trying them from simple to more complicated once. Most of the refinement is done by adding more disjunctive predicates until the maximum number of disjunctive terms have been reached (at which point we can try with more terms?).

3) Limitations. The templates for sampling require universal quantifiers. The authors claim that they could not check Paxos protocol because it requires existential quantifiers. However, it was pointed out that another related work (SWISS) managed to find invariants for Paxos, so it seems their approach does not have this limitation.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

One Page Summary. Photons: Lambdas on a diet

Recently, to prepare for a class I teach this semester, I went through the “Photons: Lambdas on a diet” SoCC’20 paper by Vojislav Dukic, Rodrigo Bruno, Ankit Singla, Gustavo Alonso. This is a very well-written paper with a ton of educational value for people like me who are only vaguely familiar with serverless space!

The authors try to solve some deficiencies in serverless runtime. See, each function invocation is isolated from all other functions by running in separate containers. Such an isolated approach has lots of benefits, ranging from security to resource isolation. But there are downsides as well — each invocation of the same function needs its own container to run, so if there are no available idle containers from previous runs, a new one needs to be created, leading to cold starts. A “cold” function takes more time to run, as it needs to be deployed in a new container and complied/loaded up to the execution environment. In the case of many popular JIT-compiled languages (i.e., Java), this also means that it initially runs from byte-code in an interpreted mode without a bunch of clever compile-time optimizations. The paper states another problem to having all functions invocations requiring their separate containers — the resource wastage. In particular, if we run many invocations of the same function concurrently, we are likely to waste memory on loading up all the dependencies/libraries separately in each container. The authors also mention that some function invocations, for instance, functions for some ML or data processing workloads, may operate from the same initial dataset. Obviously, this dataset must be loaded to each function container separately.

The paper proposes a solution, called Photons, to address the cold-start issues and resource wastage in workloads with many concurrent invocations of the same functions. The concept of a Photon describes a function executed in a container shared with other function invocations of the same type. The premise here is to forgo some isolation and allow multiple invocations of the same function to share the container. As a result, the number of cold starts can be reduced, and resources, like RAM, can be better utilized by having libraries and common data loaded only once for many concurrent function calls.

The lack of isolation between the function invocations, however, creates a few problems. The paper solves some of them, but it also passes quite a few of these problems off to the function developers. One big problem passed to the developers is ensuring that different invocations of the same function consume a predictable amount of resources. This is needed for a couple of reasons. First, there are scheduling needs, as the system needs to predict machine and container resource usage to determine which containers can be expanded to take on more function calls or which machines can handle new containers. Second, the lack of container-level isolation means that a misbehaved function instance can grab too many resources and starve other concurrent functions in the same container.

Another big problem is memory isolation and memory sharing between concurrent functions invocations. A significant part of the Photons platform deals with these problems. On memory isolation, things are easy when we only work with class variables. Each object created from the class will have a separate copy. However, some class variables may be static, and what is even worse, they can be mutable, allowing concurrent executions of the code to modify these static fields. Photons address this problem by transforming static variables into a map, where a key is a photonID (i.e., a unique invocation id of a function). Naturally, all reads and writes to a static variable change to map puts and gets. There are a bit more nuances with statics, and the paper covers them in greater detail. For sharing state, Photons runtime maintains a KV-store exposed to all Photons/function invocation in the container. One major downside of having a KV-store is managing concurrent access to it, and the systems leave it up to the developers to code coordination to this shared store. Aside from shared KV-store, functions also share the file system for temporary storage.

Reading Group. In Reference to RPC: It’s Time to Add Distributed Memory

Our 70th meeting covered the “In Reference to RPC: It’s Time to Add Distributed Memory” paper by Stephanie Wang, Benjamin Hindman, and Ion Stoica. This paper proposes some improvements to remote procedure call (RPC) frameworks. In current RPC implementations, the frameworks pass parameters to function by value. The same happens to the function return values. Passing data by value through copying works terrific for small parameters and returns. Furthermore, since the caller and callee have independent copies of the data, there are no side effects when one copy is modified or destroyed. Things start to become less than ideal when the data is large and gets reused in multiple RPC calls.

For example, if a return value of one function needs to propagate to a handful of consecutive calls, we can end up performing many wasteful data copies. The wastefulness occurs as data transfers back to the caller and later gets copied multiple times to new RPCs. The obvious solution is to avoid unnecessary copying and pass the data within the RPC framework by reference using shared memory. In fact, according to the paper, many applications resort to doing just this with distributed key-value stores. Before invoking a remote function, a caller can save the parameters to the shared KV-store, and pass the set of keys as parameters to the RPC. Managing memory at the application level, however, has some problems. For example, the application must manage memory reclamation and cleanup objects no longer needed. Another concern is breaking the semantics of RPCs, as shared mutable memory can have side effects between different components of the system.

Instead of an ad-hoc application-level solution, the paper proposes a managed approach to shared memory in RPCs. The improved framework preserves the original RPC semantics while enabling shared memory space and facilitating automatic memory management and reclamation. The paper suggests having immutable shared memory where values remain read-only after an initial write. This shared memory system will have a centralized scheduler for RPCs and a memory manager to keep track of memory usage and active memory references. With a centralized memory management approach, the RPC framework can do a few cool things. For instance, the memory manager knows when a reference is passed between nodes and when it goes out of scope on all functions/clients. This global memory knowledge allows the memory manager to perform garbage collection on values no longer needed. The paper describes this in a bit more detail, including the APIs to facilitate global memory tracking. Another benefit of the RPC framework with a centralized scheduler and memory manager is locality awareness. With information about data location in the cluster, the system can collocate function invocation on the machines that already have the values needed.

The paper mentions that the proposed vision is already somewhat used in many specialized systems, such as Ray (Ray is from the same team as the paper) or Distributed TensorFlow. However, these systems are far from general and often used as parts of larger software stacks. The challenge with this new improved RPC framework is to make it general enough to span across the systems to allow better integration and more efficient resource use. I think of this as an RPC-as-a-service kind of vision that facilitates efficiency, interoperability and provides some blanket guarantees on communication reliability.

As always, the paper has a lot more details than I can cover in the summary. We also have the presentation video from the reading group:

Discussion

1) Other Distributed Systems Frameworks. Frameworks for building distributed systems seem to gain more and more popularity these days. We were wondering how these frameworks address some of the same issues, such as copying data between components and managing locality. This is especially relevant for actor-style frameworks. We had some feedback on the Orleans framework. Since locality is important for performance, frameworks tend to move actors to the data. It often seems to be cheaper to move a small executable component than large chunks of data. More specialized data processing systems have been using these tricks for some time and can move both data and compute tasks around for optimal performance.

2) Fault Tolerance. Fault tolerance is always a big question when it comes to backbone distributed systems like this one. With stateful components and shared memory subsystems, the improved RPC framework may be more susceptible to failures. While well-known and traditional techniques can be used to protect the state, such as replicating the shared memory, these will make the system more complicated and may have performance implications. Some components, like scheduler and memory manager, are on the critical path of all function invocations. Having strongly consistent replication there may not be the best choice for scalability and performance. The other option is to expose errors to the application and let them deal with the issues like in more traditional RPC scenarios or lower-level communication approaches. However, due to the size and complexity of the improved system, it would have been nice to have some failure masking. We were lucky to ask Stephanie Wang, the author of the paper, about fault tolerance, and she suggests RPC clients handle errors by reties, however, she also mentioned that it is still an open question about how much overall fault tolerance and fault masking is needed in a system like this one.

3) Programming Model vs. Implementation. Another important discussion question was on the vision itself. The paper mentions a few specialized systems that already have many of the features described. This made us wonder what is the paper proposes then? and what are the challenges? It appeared to us that the paper’s vision is for a general and interoperable RPC system to act as a glue for larger software stacks. Stephanie provided some of her first-hand feedback here that I will quote directly:

“In my opinion, Ray already has (most of) the right features for the programming model but not the implementation. One of the main implementation areas that the paper discusses is distributed memory management. Actually, I don’t think that any of the systems mentioned, including Ray, do this super well today. For example, I would say that Distributed TensorFlow is quite sophisticated in optimizing memory usage, but it’s not as flexible as pure RPC because it requires a static graph. The second thing that I think is still missing is interoperability, as you said. I believe that this will require further innovations in both programming model and implementation.”

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Fault-Tolerant Replication with Pull-Based Consensus in MongoDB

In the last reading group meeting, we discussed MongoDB‘s replication protocol, as described in the “Fault-Tolerant Replication with Pull-Based Consensus in MongoDB” NSDI’21 paper. Our reading group has a few regular members from MongoDB, and this time around, Siyuan Zhou, one of the paper authors, attended the discussion, so we had a perfect opportunity to ask questions and get answers from people directly involved.

Historically, MongoDB had a primary backup replication scheme, which worked fine most of the time, but had a few unresolved corner-cases, such as having more than one active primary at a time due to the failures and primary churn. Having a split-brain in systems is never a good idea, so engineers looked for a better solution. Luckily, there are many solutions for this problem — an entire family of consensus-based replication protocols, like Multi-Paxos and Raft. MongoDB adopted the latter protocol to drive MongoDB’s update replication scheme.

At this time, I might have finished the summary, congratulated MongoDB on using Raft, and moved on. After all, Raft is a very well-known protocol, and using it poses very few surprises. However, Mongo DB had a couple of twists in their requirements that prevent using vanilla Raft. See, the original MongoDB primary backup scheme had a pull-based replication approach. Usually, the primary node (i.e., the leader) is active and directly prescribes operations/commands to the followers. This active-leader replication happens in traditional primary backup, in Multi-Paxos, Raft, and pretty much any other replication protocol aside from pub-sub systems that rely on subscribers pulling the data. In MongoDB, the primary is passive, and followers must request the data at some repeated intervals. In the figure below, I illustrate the difference between the push-based (active leader) and pull-based (active follower) approaches.

In both cases, the followers can learn new data from the leader. The pull-based approach, however, requires a bit more work. The leader has to compute the data batch for each follower individually, because followers may all have slightly different progress due to the differences in polling schedule. Also, there is another problem with the pull solution: the primary has no way of knowing whether the followers have received the operations during the pull. And this is important since the primary needs to count replicas accepting each operation to figure out the quorum and ultimately commit the operations. In traditional Raft, the result of AppendEntry RPC carries an ack for a successful operation, allowing the primary to count successful nodes and figure out when the quorum is satisfied. With flipped pull-style communication, there is no reply to the primary, and so we need to add some additional mechanism to propagate this information, as shown in the figure here. Adding more communication is costly, but the extra message can be piggybacked into the consecutive pull request at the expense of latency.

Now let’s diverge a bit and discuss why MongoDB picked a pull-based approach despite added complexity/cost. The traditional Pull-based solutions are centered around the primary or leader node, creating a star topology. In fact, unless you use something like Pig primitive from our PigPaxos paper, star topology is pretty much all you can have. This is not very good in some scenarios where links may be congested or cost a lot of money. Consider a WAN scenario where two nodes may exist in another region for geo-redundancy to minimize data loss in the event of regional failure. With star topology, the same payload will travel across WAN twice to each of the two replicas, consequently costing twice as much to replicate. With a pull-based approach, it is easier to orchestrate other replication topologies. All we have to do is to instruct one node to pull data from its regional peer instead of the leader across WAN. Naturally, this creates a longer replication chain but saves the WAN link bandwidth, cost, and potentially some resources at the primary.

MongoDB’s pull-based solution enables such chaining by allowing some non-primary nodes to act as sync sources. Sync source is a node that provides data to syncing servers in response to a pull request. Of course, the primary is always a sync source. The high-level overview of node interactions is a follow:

• Syncing server asks for new log entries from its sync source.
• Sync source replies with a sequence of log entries.
• Syncing server appends the log items if items follow the same history.
• If source and server logs diverge, the syncing server may have uncommitted data and must clean the divergent suffix. The detection of log divergence is based on matching the last log entry of syncing server with the first entry of the log segment sent by the source. If they match, then the source has sent a continuation of the same history.
• Syncing server notifies its sync source of how up-to-date it is, including syncing server’s term.

Now speaking of performance, the paper does not provide any comparison between push- and pull-based solutions, so we are left wondering about the impact of such drastic change. However, some experiments illustrate the comparison of star topology and chained replication in a 2-regions WAN setup. While chaining does not seem to increase the performance (unless the cross-region bandwidth is severely restricted), it predictably lowers the WAN bandwidth requirements. As far as maximum performance, the paper mentions that the limiting factor is handling client requests and not replication, and this is why one fewer server pulling from the leader does not impact throughput. I am not sure I am very comfortable with this explanation, to be honest.

The paper talks about a handful of other optimizations. I will mention just one that seemed the most interesting — speculative execution. With speculative execution, the nodes do not wait for a commitment notification to apply an operation, and speculatively apply it to the store right away. Since the underlying storage engine is multi-version, the system can still return strongly consistent reads by keeping track of the latest committed version. The multi-version store also allows rollbacks in case some operation fails to commit.

You can see my presentation of the paper on YouTube:

Discussion

1) Cost of pull-based replication. The performance cost of the pull-based solutions was the first big discussion topic that I raised in my presentation. As I mentioned, there is an extra communication step needed to allow the primary to learn of quorum acceptance. This step either adds additional message round/RPC call or adds latency if piggybacked to the next pull request. Another concern is pulling data when there is no new data, as this will be wasted communication cycles.

Luckily, we had MongoDB people, including Siyuan Zhou, one of the paper authors, to shed some light on this. To make things a little better and not waste the communication cycles, the pulls have a rather long “shelf life” — if the source has no data to ship, it will hold on to the pull request for up to 5 seconds before replying with a blank batch.

Another big optimization in the MongoDB system is a gradual shift to the push-based style! This somewhat makes the entire premise of the paper obsolete, however, this new “push-style” replication is still initiated by the syncing server with a pull, but after the initial pull, the source can push the data to the syncing server as it becomes available. So this allows building these complicated replication topologies while reducing the latency impact of a purely pull-based solution.

Another aspect of cost is the monetary cost, and this is where chained replication topologies can help a lot. Apparently, this was a big ask from clients initially and shaped a lot of the system architecture.

2) Evolution vs Revolution. So, since the original unproven replication approach was pull-based to allow chained topologies, the new improved and model-checked solution had to evolve from the original replication. One might think that it would have been easier to slap a regular push-based Raft, but that would have been a drastic change for all other components (not to mention the features). This would have required a lot more engineering effort than trying to reuse as much of the existing code as possible. This brings an interesting point on how production software gradually evolves and almost never drastically revolves.

3) Evaluation. The evaluation is the biggest problem of the paper. It lacks any comparison with other replication approaches except old MongoDB’s primary backup scheme. This makes it hard to judge the impacts of the changes. Of course, as we have seen from the discussion with the authors, the actual implementation is even more complicated and evolved. It tries to bridge the gap between pull and push replication styles, so a clear comparison based on MongoDB’s implementation may not have been possible at all.

That being said, I had some questions even about the provided self-comparisons. See, I would have expected to see a bit of throughput increase from chaining, similar to what I observe in PigPaxos, but there is none. The paper explains it by saying that replication at the sync source takes only 5% of a single core per syncing server, which would amount to just 20% of a core in star topology leader. Roughly, given the VMs used, this is around 5% of the total CPU used on replication, with the paper claiming that all remaining CPU is used to handle client requests. Assuming there is sync every 2 ms, we have about 2000 syncs per second at the leader for 12000 client requests. Doing some napkin math, we can see that there are 6 times more requests than replications per second, yet requests use 19 times the CPU, making each request roughly 3 times more expensive than each replication. Given that replication messages are not super cheap and require querying the log, and serializing the data, the performance cost difference sounds excessive to me, despite considering that client requests have a few writes in them (i.e., write to the log, and operation execution).

Siyuan explained that there is a bit more work on the request path as well. Not only the log is written (and persisted for durability), but there are also some additional writes for session maintenance and indexes. Moreover, the writes in the underlying WiredTiger engine are transactional, costing some performance as well.

4) PigPaxos? Throwing this out here, but there are push-based solutions that can have more interesting replication topologies than a star. Our PigPaxos, in fact, solves a similar issue of cross-regional replication at a low network cost.

5) Other pull-based systems. Finally, we try to look at other solutions that may use pull-based replication, but there are not many. Pub-Sub systems fit the pull model naturally as subscribers consume the data by pulling it from the queue/log/topic. Pull-based replication can be handy in disaster recovery when nodes try to catch up to the current state and must ask for updates/changes starting some cursor or point in history.

6) Reliability/Safety. As the protocol makes one important change of not rejecting the data from the old-termed source, it is important to look at the safety of this change. The paper claims to model the protocol with TLA+ and model checking it. Intuitively, however, we know that even though the node takes the data from the old source, it actively rejects it by sending its higher term. This, intuitively, should be enough to ensure that the old leader does not reach a quorum of accepts (even though there can be a majority of nodes that copied the data) and does not reply to a client of commitment. The rest is taken care of by Raft’s commit procedure upon the term change.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo

In the 68th reading group session, we discussed scheduling in dataflow-like systems with Cameo. The paper, titled “Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo,” appeared at NSDI’21.

This paper discusses some scheduling issues in data processing pipelines. When a system answers a query, it breaks the query into several steps or operators, such as groupBys, aggregations, and various other processing steps. Each of these operators executes on some underlying serverless actor (in this work, the underlying platform is Orleans), so naturally, the operators must be scheduled to run on some actors when the actors become available.

Scheduling is easy when we always have idle actors/workers that can pick up news tasks. But this can lead to overprovisioning of resources and becomes expensive for all but the most latency-critical applications. As such, resource-shared environments that allow processing multiple queries from different tenants are more cost-efficient and practical. Of course, the problem with shared environments is keeping all tenants happy and isolate them from each other. This means that scheduling needs to be aware of SLAs a system must provide to tenants and try not to blow past the deadline. Cameo is a dataflow processing system that aims for fine-grained scheduling of tasks in the system to make sure all queries, consisting of many tasks, finish within some deadline or SLA guarantee.

Things can get more complicated from here. If one step takes longer to compute than anticipated, then the start deadlines of downstream tasks/steps may be violated. The system, however, tries to adjust/reprioritize the consecutive steps to accelerate their execution and hopefully still meet the overall query deadline/SLA. But there is more. See, if the scheduler made a mistake once and misjudged the execution time of a task, this can happen for the same query again. To counter this problem, Cameo propagates the task execution information to the scheduler so that it can take better actions next time. This feedback mechanism allows the scheduler to learn from past executions of a query/task and improve in the future.

Now about the scheduler. It has two major components – the scheduling mechanism and the scheduling policy. The scheduling mechanism executes the scheduling policies, and the two parts are rather separate. Such separation allows the scheduling mechanism to take tasks with high priority (sooner start deadline) regardless of how this priority was computed by the policy. The scheduling policy component is pluggable and can be changed for different applications, as long as it creates the priorities that the execution component understands. This part must follow Cameo API to allow rescheduling of downstream/consecutive tasks and learning the stats of step execution to adjust in the future.

The authors claim significant latency improvements of up to 4.6 times when using Cameo.

As always, we had a presentation in our reading group. The video presentation by Ajit Yagaty is available on YouTube:

Discussion

1) Spark Streaming. The scenarios presented in the paper operate on dynamic data sets that continuously ingest new data. This is similar to other streaming dataflow solutions, such as Spark Streaming, for example. The paper does not provide a direct comparison in evaluation but claims that Cameo has a much more fine-grained scheduling at the per-task level.

2) Fault Tolerance. One natural question in the reading group was around fault tolerance. The paper does not touch on this topic at all. Since the system is built using the Orleans framework, we suspect that most fault tolerance is taken care of by the framework.

3) Spiky Workloads. The feedback mechanism used to learn how long tasks may take and adjust the scheduling accordingly works great in a streaming system when we have a steady inflow of data and the same queries that process this new data. Adding a new query will likely create some problems (at least for that query) until the system learns the stats of all the tasks this query uses. Similarly, having a big spike in data ingestion can cause some mayhem. If a query operates on the premise that some task needs to process events/messages, but receives 10n, its execution time may become too large and cause the downstream deadlines to violate. And such momentary spikes in a steady flow of new data may happen rather often — all that is needed is some transient network glitch that delayed some messages causing a spikey delivery moments later.

4) Query Optimizers? We are not query optimization experts here in the reding group, but we wonder if using some query optimization techniques can help here. So instead of learning on historical stats to predict how long different steps of a query may take, can we be smarter and optimize based on the actual data at hand? A simple and naive thing to do for a previous example in (3) is to account for data cardinalities when computing the start deadlines so that a spike in data ingestion can be noted and dealt with immediately.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Paper List. Papers ##71-80

We will start the fall semester with a new set of reading group papers. As before, we have ten papers in total. Nine of them are new papers from top venues, and one is a foundational paper on Viestamped Replication. Instead of the original VR paper, we will look at its more modern counterpart/rewrite/update — “Viewstamped Replication Revisited.” Anyway, here is the full list:

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. When Cloud Storage Meets RDMA

I am very behind on the reading group summaries, so this summary will be short and less detailed. In the 67th reading group meeting, we discussed the “When Cloud Storage Meets RDMA” paper from Alibaba. This paper is largely an experience report on using RDMA in practical storage systems.

Large-scale RDMA deployments are rather difficult to manage and maintain. This is the result of RDMA over Converged Ethernet (RoCE), which requires a Priority Flow Control (PFC) mechanism to prioritize the RDMA traffic and prevent frame loss on RDMA traffic due to congestion. PFC includes the mechanisms to pause network nodes (i.e switches, NICs, etc) from sending data to an overwhelmed nodes. This can also create backpressures in the network and can cause degraded operation, dropped frames, and even deadlocks. So anyway, while RDMA needs PFC, it can cause major reliability issues even for large companies.

Alibaba paper presents a handful of engineering solutions and workarounds to deliver RDMA performance while maintaining high reliability. I will mention two general approaches the paper takes.

First, the authors do not try to make every machine accessible via RDMA from anywhere in a data center. Instead, the network/datacenter is divided into podsets, and machines within a podset can take advantage of RDMA when talking to each other, while communication between podsets relies on good-old TCP. I suspect this provides a lot of isolation and greatly limits the extent to which PFC problems can go.

The second approach is to fail fast. To support tight SLA, Alibaba engineers prefer to failover the problematic connections quickly. For that, upon detecting problems with RDMA, the podset falls back to using TCP. As a result, the Pongu system operates in this kind of hybrid mode — whenever possible, RDMA is used for low latency and high throughput, but TCP connection is always ready as a backup.

The paper talks about a few other engineering solutions and optimizations aimed at improving reliability and performance. For example, the network is built with redundant switches, including Top-of-Rack switches. One important performance optimization is offloading as much work away from the CPU and onto special-purpose hardware. The paper specifically talks about CRC computations offloaded to NIC to save memory bandwidth and CPU cycles. Kernel bypass (DPDK) is used as well. Many of these and other (i.e. user-space SSD optimized file system, RPC thread management) optimizations reside in the User Space Storage Operating System (USSOS) part of Pangu.

Now I realized that I actually have not talked about the Pangu storage system itself. The image taken from the paper is a decent summary. In short, the storage cluster runs in a podset, so BlockServers can access storage with RDMA. There can be many storage clusters, each cluster in its own podsets. The storage clusters, however, do not talk to the clients using RDMA, and the clients communicate with storage using TCP. So, in the best-case scenario, a client’s request will have one TCP hop and one RDMA hop inside the storage cluster instead of two TCP hops. The experimental data from the paper largely supports this, as latency is essentially halved for clients when RDMA is used. Similarly, the Pangu Master Cluster services communicate with storage and clients using TCP.

As always, we had a paper presentation in the group. Akash Mishra presented this time:

Discussion

1) Bi-model operation. The fail fast and switch to a backup mode of operation allow for a more reliable operation of the system. However, having these two distinct modes and the spectrum of performance between them may present additional problems. Metastability may come to play here as the system transitions from fast mode to degraded mode due to some external trigger. If the system becomes overwhelmed in this degraded mode, it may develop a positive feedback loop that feeds more work and keeps it overwhelmed. In general, having distinct performance modes is a flag for metastability, and extra care must be used when the system transitions from fast to slow operation.

2) RDMA at other companies. The paper cites Microsoft’s experience report with RDMAquite extensively when talking about reliability issues of using RDMA at scale. So it is obvious that the experience is shared, and it may be the reason for RDMA being largely not offered in the public cloud. The last time I checked, Azure had a few RDMA-capable VM instances, while AWS EC2 had none (instead, EC2 offers its own tech, called EFA).

3) Offloading CPU. This was an interesting point as well. The paper notes that RDMA is so fast that the memory bandwidth becomes a bottleneck! To limit memory usage, Pangu offloads data verification through CRCs away from the CPU and onto smart NICs. This saves quite a bit of memory roundtrips on accessing messages/payload to compute/verify CRCs. Using various accelerators is a common theme in many large systems.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Cores that don’t count

Our 66th paper was a recent HotOS piece about faulty CPUs: “Cores that don’t count.” This paper from Google describes a decently common (at Google datacenter scale) issue with CPUs that may miscompute or silently fail under some conditions. This is a big deal, as we expect CPUs to be deterministic and always provide correct results for all computations. In the rare instances of failures, we expect these failures to be “loud” — CPU completely halting or rebooting or at the very least software crashing.

However, Google engineers observed some dangerous behaviors that do not align with the expectations. Unfortunately, as CPUs fail, they may do so silently and produce a wrong result to a computation. Since the failures are silent, they are way more difficult to detect. According to the paper, such failures often seem non-deterministic and often require specific conditions to manifest. Moreover, most of the time, the faults do not impact an entire CPU but one or a few cores, leading to computation errors appearing sporadically even on one machine.

Speaking of the factors and conditions that play a role in these failures, the paper talks about CPU age — a CPU may start to miscompute some instruction after some time in operation. This makes quality control both at the CPU vendor and customer more difficult — even when there is a known tendency for some errors, they may not appear on new CPU and pass the QA tests. Other factors are operating temperature, frequency, and voltages. The paper states that their impacts vary. However, it is not unreasonable to assume that some errors may start to appear under heavier load and higher temperature and then subside when the load reduces and the CPU cools down.

All these make detection hard, as reproducing a fault often requires running the same software on the same hardware under the same conditions. That being said, the authors claim that they are getting better at detecting and reproducing many of these errors. For example, the paper mentions a set of benchmarks that exercise more common failure scenarios. Such a benchmark tool can be handy for confirming the faults or offline testing. Unfortunately, it is not very helpful in detecting problems in live production workloads. Currently, engineers at Google detect the CPU issues through crash and bug reports, which are obviously very noisy. However, some patterns, such as identifying cores involved and checking if the same cores appear in multiple similar reports, can provide the first clues.

So what are the repercussions of faulty computation if left undetected and unmitigated? The short answer is all kinds of stuff! The major problem here is that a computation may cause a fault in a system before any of the traditional redundancy is involved. For example, a failure at the leader node of a replicated system can produce a bad outcome. If this outcome is then checksummed and replicated, it will propagate through the cluster undetected. Our replicated/redundant systems are geared for detecting data issues due to transfer or storage and may lack the tools/mechanisms needed to detect bad computations.

I think I will leave the summary here. Murat Demirbas goes into more detail in our group’s presentation. He also touches on a similar arXiv paper from Facebook.

Discussion.

1) Vendors. An obvious question to ask is what vendors/models have these issues. For obvious reasons, the paper does not provide any concrete information here, and it makes sense. It would not be a good idea to point fingers at a major company with which you do a lot of business. So the fact that Google (and Facebook) decided to bring this issue to attention instead of trying to resolve the problem with vendors silently is already a big deal.

2) In-house CPU development. Another discussion point over CPU manufacturers was whether taking an in-house development approach can help improve the quality. Think of AWS Graviton CPUs. From one side, this gives the large-scale company (Google, AWS, etc) more control over the design and quality assurance. On the other side, these defects are likely due to the manufacturing (i.e., shrinking transistor size) and not the design. The design issues would have likely been more deterministic. Manufacturing is hard, and only a handful of companies can pull it off. So the end product would have been made by a third party anyway. So the remaining benefit is a potential for better QA, but this is also hard. For instance, I mentioned in the summary that some issues appear after some aging. Also, big CPU companies may still have an edge in QA due to the sheer volume of CPU made and shipped. If more consumers report the problems, it may be easier for CPU vendors to identify patterns and improve in the next iteration.

3) Non-deterministic issues. The paper mentions that many of the issues are non-deterministic. At the same time, the paper discusses the techniques they use to find the problems. So it appears that there are actually a few classes of failures: (1) purely non-deterministic faults, (2) obfuscated deterministic faults, and (3) openly deterministic faults. The third category is easy, with problems reproducible all the time, such as design bugs. The second category is reproducible faults that hide. It seems like the issues discussed the most in the paper are of this type. It may be difficult to find them in all the noise as the problems manifest only under certain conditions. However, once these conditions are known, it becomes more or less easy to reproduce and identify. The first category is the tricky one. Does it even exist? Or do these failures require even more “stars to align” for the problems to appear? Anyway, it is this category that is the most challenging to look for and identify. If these failures do exist, is there even a systematic way to detect them?

4) Hardware mitigations? One way to try to approach the problem is with hardware redundancy. For instance, a system may perform some critical computations redundantly on different cores or CPUs and cross-check the results. If different cores produce a different result for the same computation, then one of the cores is at fault, and we may need a third one to arbitrate. This redundancy is not new, and mission-critical systems use it. However, this is too expensive for most public services and cloud systems.

5) Are these byzantine failures? Murat raised this question at the end of his talk, and this is an interesting one. Byzantine faults are faults that occur due to a system behaving out of spec. But with faulty computation, the out-of-spec behavior may be very subtle. Again, consider a replicated system with a leader. In a computation fault, the leader will try to replicate the same corrupted data to all the followers, leaving no possibility for a cross-check.

6) Software mitigations? So, if the faults are kind of byzantine, but also subtle enough to avoid detection by some protocols, then what can we do? One idea that was floating around is defensive programming — using assertions and verifying the computations (this works for problems that can be checked relatively cheaply). Good error reporting is also crucial to help find the correlations that may point to faulty cores.

A more fundamental question, however, remains. Can we design cloud-scale systems that tolerate such failures without costing too much?

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. FoundationDB: A Distributed Unbundled Transactional Key Value Store

Last week we discussed the “FoundationDB: A Distributed Unbundled Transactional Key Value Store” SIGMOD’21 paper. We had a rather detailed presentation by Moustafa Maher.

FoundationDB is a transactional distributed key-value store meant to serve as the “foundation” or lower layer for more comprehensive solutions. FoundationDB supports point and ranged access to keys. This is a common and decently flexible API to allow building more sophisticated data interfaces on top of it.

FoundationDB is distributed and sharded, so the bigger part of the system is transaction management. The system has a clear separation between a Paxos-based control plane and the data plane. The control plane is essentially a configuration box to manage the data plane. On the data plane, we have a transaction system, log system, and storage system. The storage system is the simplest component, representing sharded storage. Each node is backed by a persistent storage engine and an in-memory buffer to keep 5 seconds of past data for MVCC purposes. The storage layer is supported by sharded log servers that maintain the sequence of updates storage servers must apply.

The interesting part is the transaction system (TS) and how clients interact with all the components on the data plane. The client may run transactions that read and/or update the state of the system. It does so with some help from the transaction system, which also orchestrates the transaction commit. When a client reads some data in a transaction, it will go to the transaction system and request a read timestamp or version. On the TS side, one of the proxies will pick up the client’s request, contact the sequencer to obtain the version and return it to the client. This version timestamp is the latest committed version known to the sequencer to guarantee recency. Thanks to MVCC, the client can then reach out directly to the storage servers and retrieve the data at the corresponding version. Of course, the client may need to consult the system to learn which nodes are responsible for storing particular keys/shards, but the sharding info does not change often and can be cached.

Writes/updates and transaction commit procedure are driven by the TS. The client submits the write operations and the read-set to the proxy, and the proxy will attempt to commit and either return an ack or an abort message. To commit, the proxy again uses the sequencer to obtain a commit version higher than any of the previous read and commit versions. The proxy will then send the read and write set along with the versions to the conflict resolver component. The resolver detects the conflicts; if no conflict is detected, the transaction can proceed, otherwise aborted. Successful transactions proceed to persist to the log servers and will commit once all responsible log servers commit. At this point, the sequencer is updated with the latest committed version so it can continue issuing correct timestamps. Each transaction must complete well within the 5 seconds of the MVCC in-memory window. Needless to say, read-only transactions do no go through the write portion of the transaction path since they do not update any data, making reads low-weight.

The failure handling and recovery is an important point in any distributed system. FoundationDB takes a fail-fast approach that may at times sound a bit drastic. The main premise of failure handling on the transaction system is to rebuild the entire transaction system quickly instead of trying to mask failure or recover individual components. The committed but not executed transactions can be recovered from the log servers and persisted to storage, in-progress transactions that have not made it to the log servers are effectively timed out and aborted. Transactions that partly made it to the log servers are also aborted, and new log servers are built from a safe point to not include the partial transactions. Here I just scratched the surface on the recovery, and the paper (and our group’s presentation) is way more accurate and detailed.

Another important point in the paper is the testing and development of FoundationDB. The paper talks about simulator testing. In a sense, the simulator is an isolated environment for development and testing the full stack on just one machine. It comes with a handful of mock components, such as networking and a clock. All sources of non-determinism must be mocked and made deterministic for reproducibility. The paper claims that the simulator is very useful for catching all kinds of bugs with a few exceptions, such as performance bugs.

Discussion.

1) Flexibility of FoundationDB. Our previous paper was on RocksDB, a key-value single server store. It is meant as the building block for more complex systems and applications. This is very similar in spirit to FoundationDB that is meant as a “foundation” for many more complex systems. However, FoundationDB is way more complex, as it implements the data distribution/replication and transactions. This can potentially limit the use cases for FoundationDB, but obviously, this is done by design. With replication and transactions are taken care of, it may be easier to build higher-up levels of the software stack.

2) Use cases. So what are the use cases of FoundationDB then? It is used extensively at Apple. Snowflake drives its metadata management through FoundationDB. In general, it seems like use cases are shaped by the design and limitations. For example, a 5-seconds MVCC buffer precludes very long-running transactions. The limit on key and value size constrains the system from storing large blobs of data. Arguably, these are rather rare use cases for a database. One limitation is of particular interest for me, and this is geo-replication.

3) Geo-replication. The paper touches on geo-replication a bit, but it seems like FoundationDB uses geo-replication mainly for disaster tolerance. The culprit here is the sequencer. It is a single machine and this means that geo-transactions have to cross the WAN boundary at least a few times to get the timestamps for transactions. This increases the latency. In addition to simply slower transactions, numerous WAN RTT to sequencers can push the transaction time closer to the 5-second limit. So it is reasonable to assume that the system is not designed for planetary-scale deployment.

4) Simulator. We discussed the simulator quite extensively since it is a cool tool to have. One point raised was how a simulator is different from just setting up some testing/development local environment. The big plus for a simulator is its ability to control determinism and control fault injections in various components. There are systems like Jepsen to do fault injection and test certain aspects of operation, but these tend to have more specific use cases. Another simulator question was regarding the development of the simulator. It appears that the simulator was developed first, and the database was essentially build using the simulator environment.

We were also curious about the possibility of a simulator to capture error traces or do checking similar to systems like Stateright. It appears, however, that this is outside of the simulator capabilities, and it cannot capture specific execution traces or replay them. It is capable of controlling non-deterministic choices done in mock components, making a failure easier to reproduce. One somewhat related point mentioned was eidetic systems that remember all non-deterministic choices made in the OS along with all inputs to be able to replay past execution, but this seems like an overkill to try to build into a simulator.