Reading Group. Distributed Snapshots: Determining Global States of Distributed Systems

On Wednesday we kicked off a new set of papers in the reading group. We have started with one of the classical foundational papers in distributed systems and looked at the Chandy-Lamport token-based distributed snapshot algorithm. The basic idea here is to capture the state of distributed processes and channels by “flushing” the messages out of the channels with markers. The markers ensure the causality if not broken, despite the processes taking their local snapshots at different times (and with no affinity to the physical time). I am not going to summarize the paper, as there is plenty of material on the internet on the subject, however, here is our group’s short presentation by Maher Gamal:

Discussion

1) Use of snapshots. Much of our discussion focused on the use of snapshots. Aside from the trivial use for disaster recovery, snapshots are useful for debugging and runtime verification. The paper suggests some debugging/monitoring usage, like detecting stable properties of the algorithms. However, we also think that detecting violations of certain properties may be more useful in the real world. For instance, detecting the violations of invariant properties at runtime.

Just last week we talked about Aragog, a system for runtime verification of network functions. And while it does not directly use snapshots, it relies on time synchronization to make sure that the messages come from different consistent cuts of the state, and the cause and effect relationship play out correctly in the constructed state machine.

2) Snapshots of states that did not happen. Interesting things about the Chandy-Lamport snapshots are that they may capture a system state that did not happen in the execution. This is because the snapshots are taken progressively as the markers propagate through the channels, and essentially the snapshot gets rolled out at the communication speed. 

3) Timely snapshots. We also brought up snapshots that are close to the wall clock. These may be more useful for debugging purposes, than Chandy-Lamport, as they provide some notion of when things happened. Additionally, more tight snapshots that are taken at about the same time globally are better at recording true state (or should we say has fewer timing artifacts?)

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Aragog: Scalable Runtime Verification of Shardable Networked Systems

We have covered 50 papers in the reading group so far! This week we looked at the “Aragog: Scalable Runtime Verification of Shardable Networked Systems” from OSDI’20. This paper discusses the problem of verifying the network functions (NFs), such as NAT Gateways or firewalls at the runtime. The problem is quite challenging due to its scale, as large cloud providers handle enormous amounts of traffic. The paper uses NAT Gateway (NATGW) as the motivating example. NATGW balances external traffic to the servers in a way to ensure that the entire packet flow goes to the same destination. It is implemented entirely in software. This means that the NF needs to be fault-tolerant and not “forget” the destination server for each flow, but it also needs to be super quick, so strong consistency is not an option. In this NF, some primary node keeps track of routing for a flow and asynchronously replicates the information to backups. 

Aragog is set to examine the operation of this NF and look for invariant violations. Obviously, if we look at the entire NF that handles thousands and thousands of flows, the problem is rather large. However, Aragog does not need to take the global approach, and since each flow is independent, it can look at verification at the flow granularity. For example, the system can check whether at any time the flow is directed by at most one primary node. This check still requires the global view of all system nodes to make sure that there are no two primaries, but it does not require the entirety of the state and needs only the state associated with a particular flow. In the nutshell, Aragog constructs a state machine based on the description of an invariant violation for each flow, allowing for embarrassingly parallel scalability due to the sharded nature of the problem. The state machine transitions across states as it receives the events (i.e. messages), and if it reaches the termination state, a notification can be issues about the violation. Naturally, since the events can happen on different nodes, they are all shipped to a centralized verification agent that runs the invariant violation state machine. However, it would still be inefficient to ship all the events for each flow to construct these state machines, so Aragog does some filtering — it does not send messages irrelevant to the invariant we are checking, and it avoids shipping messages that can be confirmed locally to not transition to a new state of the state machine. 

The evaluation show that Aragog can detect violations due to bugs, and it has quite substantial throughput for checking.

As always, a lot more details can be found in the paper. A. Jesse Jiryu Davis did an excellent presentation of the paper. He also has quite a few interesting questions/ideas at the end of the presentation.

Discussion

1) Best-Effort. Aragog makes certain assumptions and choices that make it a “best-effort” system, as it may miss some violations (false-negatives) and maybe even detect some that did not happen (false-positives). One such assumption we noted in the discussion is time-synchronization, since the events/messages are ordered and applied to the state-machine according to their timestamps. While good time-sync is possible, it is not clear how well this was implemented in Aragog (it uses PTP) and how big of an impact it may have. For example, a reordering of the messages may potentially hide the violation from being detected, or even worse make a non-violating message ordered appear as one that has a problem. Another avenue for missing out on the violations is the use of sampling.

2) Need for Runtime and Near-real-time Verification. One of the major requirements in Aragog is the runtime or near-real-time verification. However, we were not super clear on why this is needed. For example, even if you detect a problem quickly, there may be limited options to react to it quickly. Unless it causes an outage (which would be detectable by other means), the time to resolve may be very large, as a bug needs to be reproduced, tested, fixed, and deployed, and at best can take many hours or even days. Another reason why a near-real-time requirement is questionable is the best-effort nature of the system described above. However, we have talked about one scenario where a quick response is needed. Consider a rollout of the new version of the network function. The rollouts typically occur in stages, so it is important to detect any issues early and stop the rollout before it hits the majority of servers. 

3) Embarrassingly Parallel. The cool thing about Aragog is its embarrassingly parallel nature. While the system is applied in a specific domain (testing network functions), we feel like other applications/domains can parallelize in a similar matter. 

4) “Productizing” Aragog. Again, currently, Aragog is a niche/domain-specific system. However, it seems to have nice features, like scalability, and nice language to describe invariant violations and filtering, so we wonder if this can be productized beyond this specific application.

5) PL & Distributed Systems. I largely skipped this in my summary, but quite a lot of nice things about Aragog come from its language and how it creates a state-machine out of the invariant violation description. Not to mention all the local suppression to prevent sending the events/messages that do not change the state of the machine to a global verifier. This is cool stuff, at least from our distributed system point of view that is a bit further from programming languages work. 

6) Retroscope. Finally, I could not resist and not make the comparison with some early work of mine — Retroscope. Retroscope uses HLC (to avoid time-sync causality-breaking issues) to construct a progression of globally distributed states and search for… you guessed it, invariant violations. Retroscope uses SQL-like language to express the predicates that describe a violation. Unlike Aragog, I tried to make Retroscope general. It also does not shard-like Aragog, but once the events are ingested by a streaming service, predicate search is embarrassingly parallel. Restrocope was one of my first works in the distributed systems space, so it is not quite as optimized or fast, but it is also a more general prototype.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Protean: VM Allocation Service at Scale

The last paper in our reading group was “Protean: VM Allocation Service at Scale.” This paper from Microsoft is full of technical insights into how they operate their datacenters/regions at scale. In particular, the paper discusses one of the fundamental components of any cloud provider — the VM service. The system, called Protean, is an allocation service that handles VM allocation requests at the availability zone granularity in each Azure region. It tries to figure out which server of many thousands of candidates is the best fit for the VM described in the request. Its goal is to pack VMs tightly to avoid fragmentation of resources — having too many small and unusable server chunks. There are several challenges in doing so. First, each VM has a set of requirements, such as the VM type, the number of vCPUs, memory allocation, on-server SSD size, networking, location preferences for fault tolerance, and many more. This alone makes the problem very hard to solve optimally, NP-hard as a matter of fact. The second major challenge is doing these allocations at scale. There are surprisingly many VM allocations going on in each availability zone all the time. In the steady state, the system deals with hundreds of allocation requests per second, with occasional spikes to thousands of new VM requests per second!

Protean is made up of the placement store, a database that keeps a record of VM assignments in the zone’s server inventory. One of many concurrent Allocation Agents (AAs) computes the actual VM assignment to the machine. Each AA is like a server filter — it takes the requests for new VMs and filters out all the servers not capable of hosting the VM. After the filtering, AAs compute a general preference score to figure out a set of most suitable servers and pick one random server from such a set of candidates. 

Protean implements this whole filtering and scoring using a rule approach and divides the process into multiple phases. First, it uses cluster validator rules to filter out any homogeneous clusters that cannot support a VM. These validator rules specify a “hard” requirement needed to support a VM. For example, a VM with a GPU cannot be supported by a cluster of GPU-less servers, so the entire cluster is automatically not a candidate for allocation. Then the system scores the clusters that can handle the VM based on some preference rules, which describe “nice-to-have” features, as opposed to hard requirements. A similar validator rules process is repeated to filter out the non-compatible machines in the selected cluster (for example, servers that are already at capacity and have no available resources for a VM type). Finally, all remaining good servers are scored based on the machine preference rules.

This tiered approach greatly reduces the possible allocation choices since many thousands of servers can be removed from consideration by excluding the entire clusters. However, filtering out the remaining machines is still a resource-intensive task. Protean has many rules that validate or score machines and doing these computations can add up to significant amounts of time. Each AA, therefore, caches the rules and the scoring results. This works well for two major reasons: (1) most requested VMs are very similar, so the same rules are used repeatedly; (2) inventory changes are relatively small, and between two invocations of the same rule, there will not be a lot of change in terms of server allocations. Moreover, AAs largely address the inventory changes by updating the cached rules before each use. Cache updates recompute the scores/results for a handful of servers that may have been updated by other AAs, and it is a lot faster than doing the full computation for all servers every time. To make the system more efficient, the AAs learn of changes from the placement store via a pub/sub system, so updating cache only involves local operations and local storage and does not query the placement store. This lowers the latency of cache updates and reduces the load on the placement store by avoiding the repeated queries for every cache update. 

The whole interaction between AA and placement store is not strongly consistent/transactional to avoid locking the store while computing the VM placement. This allows multiple AAs to work concurrently, but also introduces the possibility of conflicts due to a race — a couple of AAs working concurrently may pick the same server for two different VM allocation requests. These conflicts are resolved by the placement store in one of two general ways. If the target server can accommodate both VMs (i.e. the validator rules pass for the server for two allocations instead of one), then the placement store will merge the conflicts. If the server cannot handle both VMs, then one conflict allocation is retried. Protean allows up to 10 retries, although this rarely happens in practice. Also, since the system already has a mechanism to tolerate conflicts, it is fine for AAs to work off slightly stale and not-up-to-date caches, allowing the aforementioned pub/sub way of updating them. However, there is probably some balance between the staleness of cache, the number of conflicts/retries, and the overall quality of placement, so I’d suspect that the cache updates still need to be relatively recent. 

Microsoft has released the VM allocation dataset to the public! 

As always, the paper has many more details and rationale for all the decision choices. My rambling presentation of the paper is on YouTube:

Discussion

1) Preference Rule Evaluation. Preference rules implement a Compare function that orders two objects (two servers or clusters) for a given VM request. Each rule also has a weight that determines the overall weight of a preference rule in the scoring of servers/clusters. The servers are scored/ordered based on all preference rules, and the order is computed with a global compare function that combines all the individual compare functions in a weighted manner. However, the weight is constructed in such a way, that a higher-weight rule always outweighs all lower-weight rules combined. This is done to aid in the explainability of VM placement. The question we had is why do we need to compute the global compare function with all preference rules (and waste all the time doing these computations) if we can evaluate the rules sequentially starting with the most important rules first. This way, if the most important rule produces enough desired servers, we do not need to evaluate other lower-priority rules. 

Of course, caching makes computing fast, since most rules have already been evaluated before, so this may be the reason for just sticking with a general score. At the same time, the need for cache is due to the slow speed of rule evaluations, and it seems like such evaluation of all rules (at least with the strict priority of preference rules) is not necessary.

2) On the Importance of Explaining the Allocations. Part of the design is the result of having “explainable” decisions — engineers want to know which rule has impacted each decision. But how important is this? What benefits it gives the engineers/operators aside from some piece of mind of understanding the system’s choices. Can a more efficient system be designed if the “explainability” rule is omitted? After all, we have many ML systems (including safety-critical systems, like self-driving vehicles) that are based on the models that lack any “explainability”.

3) Caching System. This is one interesting caching system, that caches the results of computations. It is highly-tailored to the task at hand, and papers go into great detail on many nuances of the systems. The interesting part is the cache-updates that must be done before each cache use to bring the cache up-to-date (and recompute some parts). However, the update does not guarantee that cache is the most recent! It simply ensures that cache is more recent, but it still may not have the newest changes that are still in the pub/sub pipeline. 

4) Evaluating the Quality of Placement. The paper talks about the quality of placement quite a lot, however, the evaluation is limited to one simulation on packing density. However, it would be nice to see how production variations impact quality, especially since the paper suggested these impacts are small. Another interesting point is that the paper claims CPU to be the most contended resource. So how much impact other resources and constraints play in the quality of packing?

5) Many Interesting Tidbits. Most VMs are small – 1-2 cores. We think this is due to lots of small automated tasks, such as build and testing pipelines. Many VMs have a short lifespan. This is probably for the same reason, as these build-pipeline VMs will get destroyed when no longer needed. Need to keep empty servers. This looks weird on the surface to have idle capacity, but the paper mentions the fault-tolerance reasons — have resources to move VMs that occupy an entire machine. There are many more interesting tidbits in the paper. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Sundial: Fault-tolerant Clock Synchronization for Datacenters

In our 48th reading group meeting, we talked about time synchronization in distributed systems. More specifically, we discussed the poor state of time sync, the reasons for it, and most importantly, the solutions, as outline in the “Sundial: Fault-tolerant Clock Synchronization for Datacenters” OSDI’20 paper. We had a comprehensive presentation by Murat Demirbas. Murat’s talk was largely based on his extensive time synchronization experience in wireless sensor networks.

First, let’s talk about the need for time synchronization. Many problems of distributed computing could have been avoided if we had a perfect global clock available everywhere, as we often rely on the ordering of events for correctness. For instance, such a perfect clock would make causality/dependency tracking easy. And this alone would have simplified and improved many different systems and processes, ranging from efficient consistent snapshots, to more consistent storage systems, to the improved debuggability of all distributed applications. In the absence of a perfect global clock, we have been relying on other clever tricks and techniques, such as logical clocks, vector clocks, loosely synchronized causality-tracking hybrid logical clocks to name a few. 

Fundamentally, if we have unsynchronized clocks on two servers, we cannot use these clocks to order the events. The paper provides the following example to the issue: a shared variable X is read on some server at time T, but this same variable is updated on a different server at time T-1, however, due to time asynchrony, the update actually happens after the read in the real-time. This essentially makes the clocks useless for ordering, unless we know how badly unsynchronized the clocks are. Knowing this time uncertainty ε allows us to delay the read at T until we know that all servers have moved on past T. Some systems may resort to rescheduling the operations/transactions that fall within the uncertainty instead of waiting, but this is a similar enough use case. Naturally, having smaller uncertainty is better for performance, since a system will incur shorter waits or fewer rescheduled operations.

So what prevents us from driving this uncertainty ε down to 0 for a perfect synchronization? This is not an easy answer, and there is a myriad of factors. The clocks themselves are a problem — servers tend to have cheap quartz oscillators that “tick” at different speeds depending on temperature and voltage variations. These variations make individual machines drift apart ever-so-slightly over time. Trying to synchronize these flimsy clocks is a problem as well — the servers communicate over the network for time sync. And the network is unpredictable, starting from how messages may be routed, to different queues and buffer delays at NICs and switches. All these add the variability to message propagation time and make the network non-symmetric, as message flow in one direction may be faster than in the opposite. 

The paper proposes Sundial, a set of techniques to tame the network-induced uncertainties. Sundial focuses on reducing the message propagation variability in the network.

Firstly, Sundial avoids indirect communication and only exchanges messages between adjacent neighbor nodes in the network topology. This eliminates routing uncertainty between nodes, and also buffer/queue uncertainty at the intermediate switches. 

Secondly, Sundial records the timestamps into messages at the lower level in the network stack. This ensures that the timestamp we are transmitting for synchronization has not been sitting in the local queue for too long, again reducing the variability. 

Thirdly, Sundial ensures that a single node is used as a source of truth for the current time. Since the nodes in the system cannot talk directly to the “source of true time”, the system constructs a tree communication topology starting with the TrueTime root and covering all nodes in the system. 

Fourthly, Sundial tames the unreliable clocks on the individual servers by doing very frequent synchronizations — once every 100 microseconds. 

A big portion of the paper is devoted to handling failures since a link or node failure prevents the updated time to reach any node in the subtree below the fault, that subtree may start to deviate more the TrueTime at the root node. The gist of the solution is to allow all nodes in the impacted branch to detect the synchronization failure and switch to an alternate tree structure that was precomputed ahead of time. As all impacted nodes perform the switch to a new tree locally, the coordination is avoided, and the process is very quick. An important point in having such a back-up plan is to make sure it is smart enough to avoid correlated failures that can render both the main and back-up trees broken. The paper has a lot more details on the fault tolerance aspect, including handling the failures of root nodes.

Combining all the Sundial’s techniques provides good time synchronization with fairly tight bounds. It achieves ~100 ns synchronization even under some failures, which is significantly better than PTP time synchronization (and even better than its precursor NTP?).

Discussion

We had a nice discussion and questions, below I summarize the most important points.

1) Set of techniques. As outlines, Sundial is a set of techniques to improve the time sync, and there are some important lessons there. For example, doing things in hardware (or as close to hardware) is good. We start seeing (network) hardware optimizations for distributed systems more and more often. Just a few weeks ago we talked about smart switches and using them to drive replication and routing for “hot keys” in a storage system. Obviously, time synchronization is a different problem, but it is also the one to benefit from hardware a lot. Another lesson is to have a single source of time, even though it makes the communication pattern more structured and prone to failures. 

2) Better clocks/oscillators. Sundial synchronizes a lot – one message every ~100 microseconds. This is 10000 messages per second. We are not sure what impact this may have on the network (messages are small) and performance, but there is a practical reason for synchronizing this often. As Sundial aims to keep the uncertainty small (ε=~100ns), it cannot afford the cheap clocks to drift too much upon failures and needs to failover to a back-up tree quickly. This means that the system needs to have a super-tight timeout and very frequent message exchange. Better clocks/oscillators (or maybe using multiple clocks in a server?) can improve the situation here and either allow for even better synchronization or reduce the message frequency. Oven-controlled oscillators, for example, use a heated chamber to keep the crystal at the same temperature and reduce its drift due to the temperature variations. 

3) Comparison with PTP. The paper extensively compares Sundial with PTP protocol. The authors mention how PTP does not report ε, and that they had to augment the designs to provide the uncertainty in these protocols. The paper puts PTP’s uncertainty at ε=800μs. This contrasts with other literature but, where PTP is often reported as having a sub-nanosecond accuracy (is accuracy the same as uncertainty? but regardless, to have an accurate time, we need to have low uncertainty, otherwise how do we know it is accurate?), or nanosecond level accuracy. It is worth noting that PTP in these papers either required a dedicated low-load network for time synchronization or hardware with support of some advanced features needed for PTP to work well or both. 

4) Time sync in wireless sensor networks. Murat has spent quite some time describing how the same set of techniques was used 15-20 years ago to achieve microsecond level synchronization in the wireless sensor networks. The presentation has many fascinating details, but it appears that these techniques were known and used for some time, but not used in the data center setting. What was the blocker for doing this earlier?

5) New applications of synchronized time. Finally, we discussed a lot about the possible new applications of such precise time synchronization. The paper mentioned Spanner latency improvement as one benefit, but this is an “old stuff”. Actually, for many years we, the distributed community, were (self-)taught to not rely on time for anything critical. Sure, we use the time for things like leases and timeouts, but these are all “negative” communication that happens rarely, allowing us to be very conservative with the timeouts — there is a little harm if we add a few more seconds to a lease timeout that happens upon a leader failure and needed in rare reconfiguration cases. With super-tight synchronization bounds, we can try to use the time for positive communication and convey progress instead of the lack of one. This of course is challenging, since time is not the only uncertain variable in the system. All of our other “friends”, such as network uncertainty/variability, and crashes still exist, and we also need to tame them in some way to use the time for positive, active communication. 

For example, one may use a “silent agreement” that requires no acks from the followers if everything is going well. But this quickly falls apart under faults. However, one may treat a synchronized clock as an agreement itself and use it to drive the ordering in a multi-leader system. This may also fall apart if the network is too-asynchronous, and a message from one server that already applied the operation may reach the other follower too late – after it has irreversibly applied some other higher-timestamped operation. Taming the network asynchrony may be the next big thing to allow new usages of time in distributed systems!

The network latency vs time uncertainty is very important for constructing consistent snapshots. If time uncertainty is guaranteed to be smaller than the network latency, we can use the time to construct the consistent snapshots, since we can be sure that no message that breaks the causality can reach the other side within the uncertainty period. This, for example, can be useful for debugging. In my Retroscope consistent monitoring system, I use HLC to preserve the causality when uncertainty is too large, but having software clocks like HLC unnecessarily complicate systems. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Special Session: Building Distributed Systems With Stateright

This talk is part of the Distributed Systems Reading Group.

Stateright is a software framework for analyzing and systematically verifying distributed systems. Its name refers to its goal of verifying that a system’s collective state always satisfies a correctness specification, such as “operations invoked against the system are always linearizable.” Cloud service providers like AWS and Azure leverage verification software such as the TLA+ model checker to achieve the same goal, but whereas that solution can verify a high level system design, Stateright is able to systematically verify the underlying system implementation in addition to the design.

In this talk, Stateright’s author Jonathan Nadal will introduce the project, demo its usage, and discuss upcoming improvements. Jonathan has been in the cloud space since 2012 when he joined Amazon Web Services to work on S3. There he met Chris Newcombe, who introduced TLA+ to the company, and was immediately hooked by that technology. Jonathan later joined Oracle to help build Oracle Cloud Infrastructure, where he is currently a director of engineering and a proponent of model checking techniques within and outside his own teams. Stateright is a personal project not affiliated with Oracle or Amazon.

Video Recording

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads

On Wednesday we were discussing scheduling in large distributed ML/AI systems. Our main paper was the “Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads.” one from OSDI’20. However, it was a bit outside of our group’s comfort zone (outside of my comfort zone for sure). Luckily we had an extensive presentation with a complete background overview of prior ML/AI scheduling works.

Scheduling problems are not new by any means. We have schedulers at the OS level to allocate CPU resources, we have more sophisticated schedulers for large clusters, HPC systems, etc. So scheduling for machine learning should not pose a big problem, given all the prior scheduling work in the literature and practice. However, this is not necessarily the case, since scheduling for ML/AI systems introduces a few additional challenges. One issue is unpredictable completion time. For instance, many similar jobs may be started with different hyperparameters, and get killed off as they progress, making it harder to have a fair scheduler. Another issue is the non-uniformity of access to resources — information exchange between GPUs is faster within the confines of a single machine. When a job is scheduled on resources (GPUs/FPGAs, etc) spread across many servers, the network latency/bandwidth may get in a way. And the difference in how fast or slow the data can be exchanged between the devices grows with the degree of network separation. I find this problem similar to NUMA.

In the context of today’s paper, the additional challenges are the heterogenicity of resources in the cluster. This is something more traditional OS schedulers started to consider only relatively recently on ARM with big.LITTLE CPU architectures (and will consider soon on x86). Here, however, we have more than just a few resource variations. The paper considers different types of GPUs, but they also mention other special-purpose hardware, like FPGAs. To make things even more complicated, faster hardware provides a non-uniform speedup for different types of ML tasks — one application may have a 3x speedup from a switch to a faster GPU, while some other task may have a ten-fold speedup from the same switch. Finally, different users of the same system may even have different scheduling goals or objectives, as for some (researchers?) the completion time may be more important, while for others (production users in the cloud?) the fairness may play a bigger role.

Gavel separates the scheduling into a set of distinct components or steps. The scheduling policy takes into account the scheduling objective, i.e. the fairness or completion time. Gavel implements various scheduling policies, such as LAS, FIFO, Shortest Job First, and hierarchical composition of these policies. The scheduling mechanism executes the policy on the cluster. The throughput estimator provides the estimation of the speed of different tasks, to be fed into the policy engine if the performance estimation is not provided by the user. This is an important piece since the performance impact of different hardware varies for different training tasks!

The scheduling itself is iterative. The policy engine provides the resource allocation for a task. For example, it may say that a task needs to run 60% of the type on GPU A and 40% of the time on GPU B. This of course is computed given the user objectives and other competing tasks. The scheduling mechanism takes the allocation and tries to enforce it in an iterative manner. I will leave the detailed discussion of scheduling to the paper. The intuition, however, is rather simple. Gavel keeps track of how many iterations have been done on each resource (i.e. GPU) type, compares it with the scheduling plan, and computes the priority score for the next iteration, potentially reshuffling the tasks. So for example, if a scheduling plan calls for 60% on GPU A, but so far the task used GPU A in only 20% of scheduling iterations, then the GPU A will have a higher priority for being scheduled on the next iteration/round.

The paper evaluated the scheduling of training tasks both on a real cluster and with extensive simulations.

Discussion

1) Type of jobs. The paper describes the scheduling of training jobs, but there are other types of jobs. For example, there are interactive jobs running in things like Jupiter notebooks. This type of job has to be scheduled all the time, and cannot be completely preempted pr paused due to its interactive nature. Also, trained models are only good when they are used for something. Inference jobs may have different requirements than training jobs. For example, for a production inference task, it may be more important to schedule it on multiple different machines for fault tolerance, an opposite approach to the training jobs when scheduling on as few machines as possible improves the performance through minimizing the network bottlenecks. Taking into account these other types of tasks running in the same cluster may complicated scheduling further.

2) Task movement. We talked about the performance penalty of scheduling on many different machines due to the network latency and bandwidth. However, as tasks get scheduled against different resources, they have to migrate over the network, so a very frequent change of resources may not be very good for the performance.
The task movement itself is a tricky problem. At least it appears so to an uninitiated person like me. Do we move a task at some snapshot point? How to insert that snapshot if the task itself does not make it? Can we move the task just by dumping the process and (GPU) memory on one server and restoring it on another? Does this even work across different GPU designs/generations?

3) Fault tolerance. What is the granularity of fault tolerance in large cloud ML/AI systems? Obviously, if the process crashes for some reason, we want to restart at some previous checkpoint/snapshot. But whose responsibility is it to make these snapshots? The snapshots may be costly to make too frequently, so users may be reluctant to make snapshot their progress too often. But since the snapshots may be needed for task movement, can we take advantage of that for fault tolerance? Again, the uninitiated minds are speaking here, so likely these are all solved questions.

4) Planetary-scale scheduling. We want to avoid (frequent) communication between GPUs over the network because of limited bandwidth. On a geo-scale, the bandwidth may become even more limited, and the “first-byte” latency even larger, further impacting the performance. So it is likely not a good idea to schedule a task across WAN (or maybe even across availability zones in the same region). However, moving the tasks on a planetary scale between regions may be something to consider, if this is done infrequently, and remote regions have more of the desired resource available. This requires longer-term planning though, as we would like to make sure that an expensive move can be offset by the task using as much of that resources as possible before being moved again.

5) User performance estimates. Gavel has a performance estimation component, but it can take performance estimates from the users. It would be interesting to see if these user-provided estimates are correct in real world. Also, can Gavel adjust these user estimates based on its own measurements?

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. FlightTracker: Consistency across Read-Optimized Online Stores at Facebook

Last DistSys Reading Group we have discussed “FlightTracker: Consistency across Read-Optimized Online Stores at Facebook.” This paper is about consistency in Facebook’s TAO caching stack. TAO is a large social graph storage system composed of many caches, indexes, and persistent storage backends. The sheer size of Facebook and TAO makes it difficult to enforce meaningful consistency guarantees, and TAO essentially operates as an eventual system, despite having some stronger-consistency components in it. However, a purely eventual system is very unpredictable and hard to program for, so TAO initially settled for providing a read-your-write (RYW) consistency property. The current way of enforcing RYW is FlightTracker. FlightTracker is a recency token (Facebook calls these tokens Tickets) system running in every Facebook datacenter. Tickets keep track of recent writes performed by a datacenter-sticky user session. In a sense, the ticket is a set of tuples <Key, Key-Progress>, where the Key-Progress is some value to designate the recency of the write on the Key, like a version, timestamp, or a partition sequence number. The reads then include the ticket and propagate it across the stack to the nodes that serve the requests. With the recency information in the ticket, a server can make a local decision on whether it is sufficiently up-to-date. If the node is stale, it can forward the request to a higher-level cache or durable store to retrieve the data.

Many other systems use recency tokens, but they usually do not explicitly specify all writes done by the user. For example, a token may be a single number representing the last transaction id seen by the client. This is good for making sure the recency tokens are small, but it has a smaller resolution — such token will enforce a per partition recency instead of per key, and cause too many caches misses when the per-key RYW guarantees are needed. 

However, keeping and transferring the explicit set of all client’s write is expensive, so FlightTracker uses a few compaction strategies. For one, it is only sufficient to keep track of the most recent write on the key. Secondly, in some workloads with a larger number of writes, FlightTracker may reduce the resolution and stop tracking individual writes and, for example, switch to a partition-level tracking by transaction id or sequence number. Finally, TAO stack enforces some bounded consistency of about 60 seconds, so the writes older than 60 seconds can be purged from the ticket. 

FlightTracker stores the tickets in simple replicated systems based on a distributed hashing. Upon reads, the ticket is first fetched from the FlightTracker, and then included with all the read operations. Since one request typically makes many reads, the cost of ticket fetching is amortized over many read operations. Nevertheless, the FlightTracker is fast to fetch tickets — it takes just ~0.3 ms.  Whenever writes happen, a ticket for a particular user session is updated to include the new writes and exclude the compacted ones.

The paper has many details that I have left out of this summary and the presentation:

Discussion

1) What can go wrong if RYW is broken? The paper discusses the RYW topic quite substantially. One important point here is that RYW enforcement is “relatively” cheap — it provides some useful consistency guarantee without making cache misses due to consistency (i.e. consistency misses) too frequent. So it appears like a balance between the usefulness and the cost of consistency properties at the Facebook scale. However, the paper does not talk much about what can go wrong in Facebook (or more specifically in applications that rely on the social graph) if RYW does not hold. The paper mentions that it is a reasonable default consistency for developers and users. In our discussions, we think it is more useful for the end-users. If a person posted something on the site, then refreshed the page and the post does not appear because of RYW violation, the user may get confused whether the site is broken, or whether they pressed the right button. We do not think that in most cases there will be serious consequences, but since Facebook is a user-centric application, providing intuitive behavior is very important. Actually, the paper suggests that RYW violations may still happen, by saying that the vast majority of servers (>99.99%) are within the 60 seconds staleness window. This means that in some rare cases it is possible to have a “clean” ticket after compaction and hit one of these <0.01% of servers and get stale data. 

2) Architectural style. So… this is a Facebook paper, and you can feel it. Just like many other Facebook papers, it describes the system that appears very ad-hoc to the rest of the stack. The original TAO is also a combination of ad-hoc components bolted together (does it still use MySQL in 2021?). The FlightTracker was added to TAO later as an after-thought and improvement. Not a bad improvement by any means. And having all the components appear separate and independent serves its purpose – Facebook can build a very modular software stack. So anyway, this appears like a very “engineering” solution, bolted onto another set of bolt-on components. And it servers the purpose. Having 0.3 ms (1-2 eye blinks) additional latency to retrieve a ticket and provide something useful to developers and users is not bad at all. 

Another interesting point from this discussion is that Facebook is actually very conservative in its systems. Still, using PHP/Hack? MySQL? They create systems to last and then bolt on more and more components to improve. I guess at some critical mass all the bolted-on parts may start to fall off, but that only means it is time to rethink that system with something groundbreaking and new. So what about “Move fast and break things?” Does it contradict some of that conservativism? Or does it augment it? I think that latter — if something needs to change/improve, then just add another part somewhere to make this happen. 

3) Stronger consistency than RYW? The paper says that FlightTracker can be used to improve the consistency beyond RYW. The authors provide a few examples of systems manipulating the tokens to get more benefits — indexes and pub-sub systems. With indexes, they actually bolt-on yet another component to make them work. 

4) Cost of tickets. Since each ticket represents a set of recent writes, its cost is not static and depends on the number of writes done by the user session in the past 60 seconds. The main reason the cost of storing and transferring tickets does not explode is the 60-second global compaction, allowing to keep an average ticket size at 250 bytes. The median ticket size is 0 bytes, meaning that a lot of requests happen 60 seconds after users the last write. However, we do not think that a system like this will scale to a more write-heavy workload. TAO’s workload (at least in 2013 when the original paper came out) is 99.8% reads, so write are rare. With more writes, a constant-size ticket may start to make more sense, and we have a feeling that the cross-scope compaction when writes on a ticket are replaced with a more comprehensive/encompassing progress marker. 

5) Cross-datacenter issues. One of the reasons for implementing FlightTracker was the fault tolerance, as the prior RYW approach that relied on write-through caches could not handle some failures that require changes in the write’s route through the caches to the storage. With FlightTracker, any TAO datacenter/cluster can serve reads while enforcing the RYW. This enables, in some rare cases, to even do reads from another datacenter if the local cluster is not available. However, it appears that the users are still sticky to the datacenter, as FlightTrakcer service lives independently on each datacenter. This means that a user’s request must come to the datacenter, retrieve the ticket, and only then it can cross the datacenter boundaries if there is a big outage locally. If the outage is so severe that the user cannot even reach its datacenter, then its request won’t get the ticket and may actually experience an RYW violation. Another nice Facebook paper talks in more detail about what happens to user requests before they reach datacenters. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Paper List. Papers ##51-60.

With just four more papers to go in the DistSys Reading Group’s current batch, it is time to get the next set going. This round, we will have 10 papers that should last till the end of the spring semester. Our last batch was all about OSDI’20 papers, and this time around we will mix things around both in terms of the venues and paper recency. We will also start the batch with one foundational paper taken from Murat’s recent list of must-read classical papers in distributed systems. Without further ado, here is the list:

  1. Distributed Snapshots: Determining Global States of a Distributed System – April 7th
  2. Facebook’s Tectonic Filesystem: Efficiency from Exascale – April 14th
  3. New Directions in Cloud Programming – April 21st
  4. Paxos vs Raft: Have we reached consensus on distributed consensus? – April 28th
  5. Protocol-Aware Recovery for Consensus-Based Storage – May 5th
  6. chainifyDB: How to get rid of your Blockchain and use your DBMS instead – May 12th
  7. XFT: practical fault tolerance beyond crashes – May 19
  8. Cerebro: A Layered Data Platform for Scalable Deep Learning – May 26th
  9. Multitenancy for Fast and Programmable Networks in the Cloud – June 2nd
  10. Exploiting Symbolic Execution to Accelerate Deterministic Databases – June 9th

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories

Hard to imagine, but the reading group just completed the 45th session. We discussed “Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories,” again from OSDI’20. Pegasus is one of these systems that are very obvious in the hindsight. However, this “obviousness” is deceptive — Dan Ports, one of the authors behind the paper who joined the discussion, mentioned that the project started in 2017, so it was quite a bit of time from the start to publish with a lot of things considered and tried before zeroing in on what is in the paper. 

Pegasus is a replication system for load balancing and throughput scalability in heavily skewed workloads. Consider a workload with a handful of “hot” objects. These hot objects may have so much usage, that they overwhelm their respective servers. Naturally, this limits the overall throughput because the system is now capped by servers at their maximum capacity/utilization. The solution is to replicate these hot objects to many servers and allow clients to access them from multiple replicas. However, as soon as we have a replicated scenario, we start running into consistency issues. Strongly consistent systems often degrade in performance with the addition of more replicas due to the synchronization overheads. This is what makes Pegasus rather unique — it scales for load balancing through replication while remaining strongly consistent. The key enabler of this is the smart Top of Rack (ToR) switch that handles all the traffic in the server rack. This switch acts as the “source of synchrony” in the rack, and it does so at the packet’s line speed. 

In Pegasus, the data is assigned to servers in the rack using a consistent hashing mechanism, allowing clients to send the requests directly to servers that own the data. However, all these requests go through the ToR switch which can inspect the packets and make some additional routing decisions for “hot” objects. Let’s consider a write request for a such high-demand object. ToR inspects a packet, and if it is for a “hot” key, it sends the write message to some larger and potentially different set of servers in a rack, essentially increasing the replication factor and rotating the responsible servers. Once the servers ack the write completion, the ToR switch sees the acks and records these servers into its coherency directory as servers with the latest copy of the data. The read requests have a similar rerouting fate — if a read is for a hot object, instead of going to the default server, the ToR switch sends it to one of the replicas from its coherency directory. The paper has more details on implementing this coherency directory and keeping track of the recent progress using a simple versioning mechanism.

The end result is awesome! Just by replicating a handful of objects in skewed workloads (~16 objects out of a million in the paper), Pegasus achieves load balancing and high throughput beating in-network caching in almost all scenarios. There are a few other advantages to Pegasus that are missing in other SOTA solutions: the ability to store larger objects (not evaluated), and tolerance of workloads with different read-write ratios (evaluated extensively).

Finally, I have not touched on a few other important pieces of the system: figuring out which keys are hot and fault-tolerance. For measuring the key temperature, the Pegasus statistics engine samples some packets and determines the frequency of keys in the samples to make gauge how hot each key is. For fault-tolerance, the system uses chain replication across racks for durability.

As always, we have our presentation of the paper by A. Jesse Jiryu Davis:

Discussion

This time around we had Dan Ports join us to answer the questions about the paper, so this turned out to be a nice discussion despite a slightly lower than expected attendance. 

1) Simple API. Currently, Pegasus supports a simple API with reads and simple destructive writes (i.e. each write is an unconditional overwrite of the previous copy of the data). The main reason for this is how the writes are structured in Pegasus. The system is very nimble and quickly adjustable, it picks write servers on the fly as the write request “goes through” the switch. This means that a write should be able to just complete on the new server. However, if the write operation is, for example, a conditional write or an update, then such an update also needs to have the previous version of the object, which may be missing on the new server. We have spent some time discussing workarounds for this, and they surely seem possible. But the solution also introduces additional communication, which both introduces more load to the servers and more latency for operations. And since we are dealing with a subset of objects that already generate the most load in the system, adding anything more to it must be avoided as much as possible. The cost of supporting these more complex API will also differ for various read-write ratios.

2) Comparison with caching. Another big discussion was around using caching to load balance the system. As Dan pointed out, caches are good when they are faster than storage, but for super-fast in-memory storage, it is hard to make a cache faster. NetCache (one of the systems used for comparison in the paper) does provide a faster cache by placing it in the network switch. It has several downsides: handles only small objects, consumes significant switch resources, and does not work well for write workloads (this is a read-through cache, I think). Of course, it is possible to make it a write-through cache as well to optimize for write workloads, but it still does not solve other deficiencies and adds even more complexity to the system. We also touched on the more complicated fault-tolerance of cached systems. The disparity between the load that cache and underlying systems can take can create situations when the underlying systems get overrun upon cache failure or excessive cache misses. 

3) Chain replication. Since Pegasus replicates for scalability, it needs a separate mechanism to handle fault-tolerance. The paper suggests using a chain replication approach, where racks are the chain nodes. Only the tail rack serves reads, however, the writes must be applied in all racks as the write operation propagates through the chain. One question we had is why not use something like CRAQ to allow other racks to serve reads, but the reality is that this is simply not needed. The chances that an object can become so skewed and hot that it needs more than a rack worth of servers are very slim, so there is no need to complicate the protocol. Another question I have now but forgot to ask during the discussion is what happens to hot writes as they go through the chain? If non-tail racks only use the default server for writes on “hot” keys (instead of let’s say round-robin or random node), then this server may get overwhelmed. But I think it is trivial to pick a random server for the hot object on each write at the non-tail racks. 

4) Zipfian distribution and workload skewness. Pegasus needs to load-balance fewer keys for a less skewed Zipfian distribution. This was an interesting and a bit counter-intuitive observation at the first glance since one can intuitively expect the more skewed distribution to require more load-balancing. However, a higher alpha Zipf has more skewed objects, but not necessarily more skewed objects than a lower alpha Zipfian distribution. Fewer highly skewed objects mean less load-balancing. 

5) Virtualization of top-of-rack switches. One important question about the technology that enables Pegasus is the virtualization of these smart ToR switches. Currently, it is not the case, so one needs to have bare-metal access to a switch to deploy the code. Virtualization of such a switch may make the technology available to cloud users. I think this would be a huge boost to the overall state of distributed computing at the datacenter level. I would be speculating here, but I think a lot depends on the willingness of manufacturers to provide such support, especially given the relatively limited capabilities of the hardware right now. Of course, the virtualization should not add a significant latency penalty to the users, and most importantly should not add any penalty to non-users (applications/systems that reside in the same rack but do not use the extended capabilities of the switches). Couple all these with the risks of running user’s code on the hardware that handles all the traffic in the rack, and we also need to worry about user isolation/security more than ever. However, as wishful as it is, it is quite probable that these smart switches will not make their way to the public cloud any time soon. This gives large cloud vendors an edge since they can benefit from the technology in their internal systems. Smaller service providers that rely on the cloud, however, will have to find a way to compete without access to this state-of-the-art technology.  Aside from my extremely high-level speculations, some smart people actually go deeper into the topic.

6) There were a few other minor topics discussed, and jokes are thrown here and there. For example, Dan explains Pegasus with cat pictures

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Performance-Optimal Read-Only Transactions

Last meeting we looked at “Performance-Optimal Read-Only Transactions” from OSDI’20. This paper covers important topics of transactional reads in database/data-management systems. In particular, the paper discusses “one-shot” read-only transactions that complete in 1 network round-trip-time (RTT) without blocking and bloated and expensive messages. If this sounds too good to be true, it is. Before presenting these types of transactions, the authors discuss why it is impossible to have Non-blocking, One round-trip, Constant size messaging, Strictly Serializable (NOCS) read-only transactions. This becomes a “pick 3 out of 4” kind of deal.

The Performance-Optimal Read-Only Transaction (PORT) system shows how to get away with NOC and try to get as close as possible to S. For One round-trip constraint, the paper makes the clients coordinate their own read-transactions and control the ordering of reads, all in one round of message exchange. This requires the clients to send some recency/progress metadata over to servers. In the case of PORT, the metadata is a Version Clock, a type of logical clock. It is just a number, so it is Constant-size. Finally, the servers can use metadata to return the latest value that satisfies the recency constraint imposed by the Version Clock in a Non-blocking manner. The servers also avoid coordination to again satisfy the One round-trip requirement. To make sure the reads do not block, PORT never considers the in-progress operations. PORT separates the in-progress operations from the completed ones with a stable frontier time/version. Clients must request reads at what they know to be the latest stable, immutable state of the system and never try to request the state from the in-progress operations. Since different clients may have different and stale knowledge of the stable frontier, the system needs to support reading different versions of data, hence PORT relies on a multi-version store. 

PORT also does some clever trickery to improve the consistency. For example, a promotion mechanism is used to block-out a range of versions for writing in some cases. If some data was written with a version v=10 and then a read transaction has requested a value at version v=15,  the v=10 value will be promoted to occupy the entire range of versions [10, 15], and servers will be disallowed to write anything in that range. This, however, does not cause the write in that version range to abort, and instead, it will be written at version v=16. 

The paper implements PORT in ScyllaDB and Eiger and shows nearly identical throughput in read-heavy workloads to that of non-transactional reads while also beating Eiger’s transactions. There are quite a few important details and nuances on implementing PORT. The implementation on top of Eiger is full of surprises as the promotion mechanism described above no longer works for transactional writes, and PORT uses another clever trick. 

The presentation by Alex Miller that goes into a bit more details than my summary:

Discussion

1) SNOW theorem. NOCS theorem the authors discuss sounds similar to the SNOW. Well, it is by the same first author, so this makes some sense. Both are about read-only transactions, both concern the trade-offs between performance and latency. NOCS focuses on performance-optimal transactions, while SNOW talks about latency-optimal. Both talk about the impossibility of having the highest consistency and the be “x-optimal” (fill in the “x”). Moreover, the NOC (non-blocking, one round trip, constant metadata) implies that performance here largely means latency as well. It jsut happens that if we stop doing all the extra work, then the throughput improves as well. In a sense, it appears that NOCS is a rebranding of SNOW to some extent. We can even map letters in both abbreviations to similar concepts. S = (strict) serializability in both cases. N = Non-blocking. O = one round trip (in SNOW it is coordinate/retry, which is pretty much whether we add more messages or not). So three letters are the same, W & C are the difference, but even there we can find some similarities. W in SNOW stands for write conflict avoidance, and one way to do so may require violating C in constant metadata. The paper itself mentions that NOCS is similar to SNOW.

2) Other causal systems. Occult and PaRiS were brought up to the discussion briefly. We have not spent too much time on this though. Occult is a causal system that avoids “slowdown cascades” due to dependencies and the need to enforce causality. PORT with its one-RTT non-blocking mechanism seems to be similar in this regard, so a comparison would be interesting. 

3) HLC for the logical clock? HLCs are used in the transactions in MongoDB and Cockroach. HLCs are logical clocks, constant in size, and do help identify consistent cuts/snapshots for transactions. MongoDB uses HLCs for cross-partition causal transactions, and it seems to fit well within the NOC. CockroachDB is more involved, but it also uses HLC. Another important part of HLC is that it can provide a single serial order, but this is something PORT actually avoids in Eiger-PORT since it needs to provide a different serial order to different clients to enforce read-your-write property without blocking.

4) On the importance of a stable frontier. A stable frontier is the time in the system’s execution that separates what is safe to read and what is not. Everything before the stable frontier is committed/executed and safe, any operation after the frontier may not have been fully written/committed yet, and is not safe. This separation is clear in Scylla-PORT, but gets blurred in Eiger-PORT and its read-your-write reordering. 

5) Replication. The paper does not address replication issues at all, so one has to wonder about how it handles replication and associated failures. For example, in Cassandra/Scylla, a read succeeds after being completed by some read-quorum that may be smaller than all replicas for the object. This means that you can promote the value on a subset of replicas, and then do a write on some quorum containing the un-promoted replicas and end up with the same write recorded under different versions on different replicas. This may or may not be a huge problem, but a conversation on replication/failures would be very useful. The code (which is open source) may help to shed the light on this, but we have not had a chance to look at it during the discussion.

6) Eiger-PORT. This one is very different from the ScyllaDB version. It is different from how the paper described PORT all along since Eiger-PORT cannot promote the operations because now writes are in a transaction, and all writes from one transaction must be promoted to a higher version atomically. If you do that, you need to coordinate between servers and add messages and lose the O part of NOC. Authors go into more details describing the Eiger-PORT protocol, which is not the easiest thing to grasp from the first read. It is also mind-twisting when you start reordering operations for different clients. Actually, as of the time of this writing, we were still discussing some aspects of Eiger-PORT in our group’s slack channel.

7) Evaluation. We liked the choice and rationale for picking the baseline systems to evaluate against. PORT indeed showed to have low overhead in ScyllaDB while improving the database’s consistency semantics.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!