Tag Archives: scheduling

Reading Group. Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo

In the 68th reading group session, we discussed scheduling in dataflow-like systems with Cameo. The paper, titled “Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo,” appeared at NSDI’21.

This paper discusses some scheduling issues in data processing pipelines. When a system answers a query, it breaks the query into several steps or operators, such as groupBys, aggregations, and various other processing steps. Each of these operators executes on some underlying serverless actor (in this work, the underlying platform is Orleans), so naturally, the operators must be scheduled to run on some actors when the actors become available. 

Scheduling is easy when we always have idle actors/workers that can pick up news tasks. But this can lead to overprovisioning of resources and becomes expensive for all but the most latency-critical applications. As such, resource-shared environments that allow processing multiple queries from different tenants are more cost-efficient and practical. Of course, the problem with shared environments is keeping all tenants happy and isolate them from each other. This means that scheduling needs to be aware of SLAs a system must provide to tenants and try not to blow past the deadline. Cameo is a dataflow processing system that aims for fine-grained scheduling of tasks in the system to make sure all queries, consisting of many tasks, finish within some deadline or SLA guarantee. 

Keeping a deadline is tricky though. One complication arises from queries that require more than one operation to compute. Some of these query operations may be dependant on each other and need to happen in sequence. As a result, the query deadline must be broken down into sub-deadlines for all the sequential execution steps. Instead of enforcing the execution deadlines, Came enforces a start deadline for each task. This deadline is the latest time a task must begin to compute, such that it will not impact the downstream tasks and overall query deadline. 

Things can get more complicated from here. If one step takes longer to compute than anticipated, then the start deadlines of downstream tasks/steps may be violated. The system, however, tries to adjust/reprioritize the consecutive steps to accelerate their execution and hopefully still meet the overall query deadline/SLA. But there is more. See, if the scheduler made a mistake once and misjudged the execution time of a task, this can happen for the same query again. To counter this problem, Cameo propagates the task execution information to the scheduler so that it can take better actions next time. This feedback mechanism allows the scheduler to learn from past executions of a query/task and improve in the future.

Now about the scheduler. It has two major components – the scheduling mechanism and the scheduling policy. The scheduling mechanism executes the scheduling policies, and the two parts are rather separate. Such separation allows the scheduling mechanism to take tasks with high priority (sooner start deadline) regardless of how this priority was computed by the policy. The scheduling policy component is pluggable and can be changed for different applications, as long as it creates the priorities that the execution component understands. This part must follow Cameo API to allow rescheduling of downstream/consecutive tasks and learning the stats of step execution to adjust in the future.

The authors claim significant latency improvements of up to 4.6 times when using Cameo. 

As always, we had a presentation in our reading group. The video presentation by Ajit Yagaty is available on YouTube:

Discussion

1) Spark Streaming. The scenarios presented in the paper operate on dynamic data sets that continuously ingest new data. This is similar to other streaming dataflow solutions, such as Spark Streaming, for example. The paper does not provide a direct comparison in evaluation but claims that Cameo has a much more fine-grained scheduling at the per-task level.

2) Fault Tolerance. One natural question in the reading group was around fault tolerance. The paper does not touch on this topic at all. Since the system is built using the Orleans framework, we suspect that most fault tolerance is taken care of by the framework. 

3) Spiky Workloads. The feedback mechanism used to learn how long tasks may take and adjust the scheduling accordingly works great in a streaming system when we have a steady inflow of data and the same queries that process this new data. Adding a new query will likely create some problems (at least for that query) until the system learns the stats of all the tasks this query uses. Similarly, having a big spike in data ingestion can cause some mayhem. If a query operates on the premise that some task needs to process events/messages, but receives 10n, its execution time may become too large and cause the downstream deadlines to violate. And such momentary spikes in a steady flow of new data may happen rather often — all that is needed is some transient network glitch that delayed some messages causing a spikey delivery moments later. 

4) Query Optimizers? We are not query optimization experts here in the reding group, but we wonder if using some query optimization techniques can help here. So instead of learning on historical stats to predict how long different steps of a query may take, can we be smarter and optimize based on the actual data at hand? A simple and naive thing to do for a previous example in (3) is to account for data cardinalities when computing the start deadlines so that a spike in data ingestion can be noted and dealt with immediately. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Protean: VM Allocation Service at Scale

The last paper in our reading group was “Protean: VM Allocation Service at Scale.” This paper from Microsoft is full of technical insights into how they operate their datacenters/regions at scale. In particular, the paper discusses one of the fundamental components of any cloud provider — the VM service. The system, called Protean, is an allocation service that handles VM allocation requests at the availability zone granularity in each Azure region. It tries to figure out which server of many thousands of candidates is the best fit for the VM described in the request. Its goal is to pack VMs tightly to avoid fragmentation of resources — having too many small and unusable server chunks. There are several challenges in doing so. First, each VM has a set of requirements, such as the VM type, the number of vCPUs, memory allocation, on-server SSD size, networking, location preferences for fault tolerance, and many more. This alone makes the problem very hard to solve optimally, NP-hard as a matter of fact. The second major challenge is doing these allocations at scale. There are surprisingly many VM allocations going on in each availability zone all the time. In the steady state, the system deals with hundreds of allocation requests per second, with occasional spikes to thousands of new VM requests per second!

Protean is made up of the placement store, a database that keeps a record of VM assignments in the zone’s server inventory. One of many concurrent Allocation Agents (AAs) computes the actual VM assignment to the machine. Each AA is like a server filter — it takes the requests for new VMs and filters out all the servers not capable of hosting the VM. After the filtering, AAs compute a general preference score to figure out a set of most suitable servers and pick one random server from such a set of candidates. 

Protean implements this whole filtering and scoring using a rule approach and divides the process into multiple phases. First, it uses cluster validator rules to filter out any homogeneous clusters that cannot support a VM. These validator rules specify a “hard” requirement needed to support a VM. For example, a VM with a GPU cannot be supported by a cluster of GPU-less servers, so the entire cluster is automatically not a candidate for allocation. Then the system scores the clusters that can handle the VM based on some preference rules, which describe “nice-to-have” features, as opposed to hard requirements. A similar validator rules process is repeated to filter out the non-compatible machines in the selected cluster (for example, servers that are already at capacity and have no available resources for a VM type). Finally, all remaining good servers are scored based on the machine preference rules.

This tiered approach greatly reduces the possible allocation choices since many thousands of servers can be removed from consideration by excluding the entire clusters. However, filtering out the remaining machines is still a resource-intensive task. Protean has many rules that validate or score machines and doing these computations can add up to significant amounts of time. Each AA, therefore, caches the rules and the scoring results. This works well for two major reasons: (1) most requested VMs are very similar, so the same rules are used repeatedly; (2) inventory changes are relatively small, and between two invocations of the same rule, there will not be a lot of change in terms of server allocations. Moreover, AAs largely address the inventory changes by updating the cached rules before each use. Cache updates recompute the scores/results for a handful of servers that may have been updated by other AAs, and it is a lot faster than doing the full computation for all servers every time. To make the system more efficient, the AAs learn of changes from the placement store via a pub/sub system, so updating cache only involves local operations and local storage and does not query the placement store. This lowers the latency of cache updates and reduces the load on the placement store by avoiding the repeated queries for every cache update. 

The whole interaction between AA and placement store is not strongly consistent/transactional to avoid locking the store while computing the VM placement. This allows multiple AAs to work concurrently, but also introduces the possibility of conflicts due to a race — a couple of AAs working concurrently may pick the same server for two different VM allocation requests. These conflicts are resolved by the placement store in one of two general ways. If the target server can accommodate both VMs (i.e. the validator rules pass for the server for two allocations instead of one), then the placement store will merge the conflicts. If the server cannot handle both VMs, then one conflict allocation is retried. Protean allows up to 10 retries, although this rarely happens in practice. Also, since the system already has a mechanism to tolerate conflicts, it is fine for AAs to work off slightly stale and not-up-to-date caches, allowing the aforementioned pub/sub way of updating them. However, there is probably some balance between the staleness of cache, the number of conflicts/retries, and the overall quality of placement, so I’d suspect that the cache updates still need to be relatively recent. 

Microsoft has released the VM allocation dataset to the public! 

As always, the paper has many more details and rationale for all the decision choices. My rambling presentation of the paper is on YouTube:

Discussion

1) Preference Rule Evaluation. Preference rules implement a Compare function that orders two objects (two servers or clusters) for a given VM request. Each rule also has a weight that determines the overall weight of a preference rule in the scoring of servers/clusters. The servers are scored/ordered based on all preference rules, and the order is computed with a global compare function that combines all the individual compare functions in a weighted manner. However, the weight is constructed in such a way, that a higher-weight rule always outweighs all lower-weight rules combined. This is done to aid in the explainability of VM placement. The question we had is why do we need to compute the global compare function with all preference rules (and waste all the time doing these computations) if we can evaluate the rules sequentially starting with the most important rules first. This way, if the most important rule produces enough desired servers, we do not need to evaluate other lower-priority rules. 

Of course, caching makes computing fast, since most rules have already been evaluated before, so this may be the reason for just sticking with a general score. At the same time, the need for cache is due to the slow speed of rule evaluations, and it seems like such evaluation of all rules (at least with the strict priority of preference rules) is not necessary.

2) On the Importance of Explaining the Allocations. Part of the design is the result of having “explainable” decisions — engineers want to know which rule has impacted each decision. But how important is this? What benefits it gives the engineers/operators aside from some piece of mind of understanding the system’s choices. Can a more efficient system be designed if the “explainability” rule is omitted? After all, we have many ML systems (including safety-critical systems, like self-driving vehicles) that are based on the models that lack any “explainability”.

3) Caching System. This is one interesting caching system, that caches the results of computations. It is highly-tailored to the task at hand, and papers go into great detail on many nuances of the systems. The interesting part is the cache-updates that must be done before each cache use to bring the cache up-to-date (and recompute some parts). However, the update does not guarantee that cache is the most recent! It simply ensures that cache is more recent, but it still may not have the newest changes that are still in the pub/sub pipeline. 

4) Evaluating the Quality of Placement. The paper talks about the quality of placement quite a lot, however, the evaluation is limited to one simulation on packing density. However, it would be nice to see how production variations impact quality, especially since the paper suggested these impacts are small. Another interesting point is that the paper claims CPU to be the most contended resource. So how much impact other resources and constraints play in the quality of packing?

5) Many Interesting Tidbits. Most VMs are small – 1-2 cores. We think this is due to lots of small automated tasks, such as build and testing pipelines. Many VMs have a short lifespan. This is probably for the same reason, as these build-pipeline VMs will get destroyed when no longer needed. Need to keep empty servers. This looks weird on the surface to have idle capacity, but the paper mentions the fault-tolerance reasons — have resources to move VMs that occupy an entire machine. There are many more interesting tidbits in the paper. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads

On Wednesday we were discussing scheduling in large distributed ML/AI systems. Our main paper was the “Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads.” one from OSDI’20. However, it was a bit outside of our group’s comfort zone (outside of my comfort zone for sure). Luckily we had an extensive presentation with a complete background overview of prior ML/AI scheduling works.

Scheduling problems are not new by any means. We have schedulers at the OS level to allocate CPU resources, we have more sophisticated schedulers for large clusters, HPC systems, etc. So scheduling for machine learning should not pose a big problem, given all the prior scheduling work in the literature and practice. However, this is not necessarily the case, since scheduling for ML/AI systems introduces a few additional challenges. One issue is unpredictable completion time. For instance, many similar jobs may be started with different hyperparameters, and get killed off as they progress, making it harder to have a fair scheduler. Another issue is the non-uniformity of access to resources — information exchange between GPUs is faster within the confines of a single machine. When a job is scheduled on resources (GPUs/FPGAs, etc) spread across many servers, the network latency/bandwidth may get in a way. And the difference in how fast or slow the data can be exchanged between the devices grows with the degree of network separation. I find this problem similar to NUMA.

In the context of today’s paper, the additional challenges are the heterogenicity of resources in the cluster. This is something more traditional OS schedulers started to consider only relatively recently on ARM with big.LITTLE CPU architectures (and will consider soon on x86). Here, however, we have more than just a few resource variations. The paper considers different types of GPUs, but they also mention other special-purpose hardware, like FPGAs. To make things even more complicated, faster hardware provides a non-uniform speedup for different types of ML tasks — one application may have a 3x speedup from a switch to a faster GPU, while some other task may have a ten-fold speedup from the same switch. Finally, different users of the same system may even have different scheduling goals or objectives, as for some (researchers?) the completion time may be more important, while for others (production users in the cloud?) the fairness may play a bigger role.

Gavel separates the scheduling into a set of distinct components or steps. The scheduling policy takes into account the scheduling objective, i.e. the fairness or completion time. Gavel implements various scheduling policies, such as LAS, FIFO, Shortest Job First, and hierarchical composition of these policies. The scheduling mechanism executes the policy on the cluster. The throughput estimator provides the estimation of the speed of different tasks, to be fed into the policy engine if the performance estimation is not provided by the user. This is an important piece since the performance impact of different hardware varies for different training tasks!

The scheduling itself is iterative. The policy engine provides the resource allocation for a task. For example, it may say that a task needs to run 60% of the type on GPU A and 40% of the time on GPU B. This of course is computed given the user objectives and other competing tasks. The scheduling mechanism takes the allocation and tries to enforce it in an iterative manner. I will leave the detailed discussion of scheduling to the paper. The intuition, however, is rather simple. Gavel keeps track of how many iterations have been done on each resource (i.e. GPU) type, compares it with the scheduling plan, and computes the priority score for the next iteration, potentially reshuffling the tasks. So for example, if a scheduling plan calls for 60% on GPU A, but so far the task used GPU A in only 20% of scheduling iterations, then the GPU A will have a higher priority for being scheduled on the next iteration/round.

The paper evaluated the scheduling of training tasks both on a real cluster and with extensive simulations.

Discussion

1) Type of jobs. The paper describes the scheduling of training jobs, but there are other types of jobs. For example, there are interactive jobs running in things like Jupiter notebooks. This type of job has to be scheduled all the time, and cannot be completely preempted pr paused due to its interactive nature. Also, trained models are only good when they are used for something. Inference jobs may have different requirements than training jobs. For example, for a production inference task, it may be more important to schedule it on multiple different machines for fault tolerance, an opposite approach to the training jobs when scheduling on as few machines as possible improves the performance through minimizing the network bottlenecks. Taking into account these other types of tasks running in the same cluster may complicated scheduling further.

2) Task movement. We talked about the performance penalty of scheduling on many different machines due to the network latency and bandwidth. However, as tasks get scheduled against different resources, they have to migrate over the network, so a very frequent change of resources may not be very good for the performance.
The task movement itself is a tricky problem. At least it appears so to an uninitiated person like me. Do we move a task at some snapshot point? How to insert that snapshot if the task itself does not make it? Can we move the task just by dumping the process and (GPU) memory on one server and restoring it on another? Does this even work across different GPU designs/generations?

3) Fault tolerance. What is the granularity of fault tolerance in large cloud ML/AI systems? Obviously, if the process crashes for some reason, we want to restart at some previous checkpoint/snapshot. But whose responsibility is it to make these snapshots? The snapshots may be costly to make too frequently, so users may be reluctant to make snapshot their progress too often. But since the snapshots may be needed for task movement, can we take advantage of that for fault tolerance? Again, the uninitiated minds are speaking here, so likely these are all solved questions.

4) Planetary-scale scheduling. We want to avoid (frequent) communication between GPUs over the network because of limited bandwidth. On a geo-scale, the bandwidth may become even more limited, and the “first-byte” latency even larger, further impacting the performance. So it is likely not a good idea to schedule a task across WAN (or maybe even across availability zones in the same region). However, moving the tasks on a planetary scale between regions may be something to consider, if this is done infrequently, and remote regions have more of the desired resource available. This requires longer-term planning though, as we would like to make sure that an expensive move can be offset by the task using as much of that resources as possible before being moved again.

5) User performance estimates. Gavel has a performance estimation component, but it can take performance estimates from the users. It would be interesting to see if these user-provided estimates are correct in real world. Also, can Gavel adjust these user estimates based on its own measurements?

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!