One Page Summary

One Page Summary: Flease – Lease Coordination without a Lock Server

This paper talks about a decentralized lease management solution. In the past, many lock/lease services have been centralized, placing a single authority to manage all locks in the system. Google’s Chubby, Apache ZooKeeper, etcd, and others rely on a centralized approach and backed by some flavor of a consensus algorithm for fault-tolerance. According to Flease authors, such centralized approach Flease Groupsmay not be ideal at all times and can create a bottlenecks when coordination is required only within small groups of nodes in the system. Distributed filesystems seem to be candidates for this sort of lock management, as a group of nodes tend to be responsible only for resources sharded to that group. Each such group acts independently and maintains locks for resources assigned to it, making a global lock not necessary.

The implementation of decentralized, sharded, lock management system is rather trivial. Since getting a lock/lease requires nodes to reach the consensus about the lease ownership and duration, a Paxos algorithm will suffice. In fact Flease uses a flavor of Paxos built with a distributed register. Right away we can see why Flease targets systems that are sharded into small non-overlapping groups. Running Paxos over too many nodes will degrade the performance.

Just like other lease system, Flease needs to have synchronized clocks with known max time skew uncertainty ε to control lease expiration. It also places some restrictions on minimum lease duration due to the network characteristics, i.e. lease duration has to be greater than two RTTs and greater than ε. Choosing the max lease duration is important for performance reasons.

Flease ThroughputFlease was evaluated against ZooKeeper, as both are implemented in Java and use the same network IO libraries.The figure shows throughput per (client) machine with Flease (straight line) and ZooKeeper. Flease uses small groups of 3 nodes, each running Paxos, and ZooKeeper runs on 3 nodes as well, with all clients connecting to it for lock management. As expected form this experiment, Flease has greater parallelism. When running 30 clients, Flease runs 10 Paxos machines, while ZooKeeper still operates a single one (3 nodes) with 30 clients connected. It is unclear how quickly the performance of Flease will degrade as the group size increases. In essence, similar if not better results could have been achieved by deploying separate ZooKeepers for each group to allow for the same level of parallelism.

The problem of lock management is important and Flease give a good example of application that can benefit from a less-centralized solution to locks. However, the approach in this paper is as trivial as deploying multiple lock management systems for each of the independent non-overlapping groups of nodes in the system. Flease performance will degrade if group size grows above 3 to 5 nodes. In addition, the algorithm is not suitable when group boundaries must be breached on occasion.

One Page Summary: “milliScope: a Fine-Grained Monitoring Framework for Performance Debugging of n-Tier Web Services”

Authors of the ICDCS2017 milliScope paper attack an interesting monitoring problem for distributed systems: detecting and determining a cause of short-lived events in the system. In particular, they address the issue of identifying very short bottlenecks (VSBs) in distributed web services. VSBs manifest themselves as performance degradation of a small number of requests, however they are intermittent, short-lived and hard to detect with tools that aggregate or sample performance data.

milliscope1MilliScope is a monitoring solution aimed exactly at detecting such short bottlenecks and finding their root-cause. For the detection part, milliScope relies on the Event mScopeMonitors. These monitors are present at every component of the web service and record the timestamps associated with each request entering and leaving a particular component. In total, 4 local timestamps are recorded per component, 2 on the forward pass of the request and 2 on the return. This classical request tracing approach captures causal information about request propagation through the components, and it is sufficient to pinpoint the exact request and the exact component experiencing a slow-down.

milliscope2In order to provide more information about the cause of the bottleneck, milliScope uses Resource mScopeMonitors. Resource monitors use existing performance monitoring tools to log the metrics, such as CPU utilization, memory usage, I/O usage, etc. MilliScope then transforms the performance logs to a common, structured format, adding some extra support information. The data from both the event and resource mScopeMonitors eventually ends up in a warehouse and can be queried by the users. Users can overlay the data from different monitors onto each other, allowing to better identify what component or components were causing the bottleneck and what was the root cause behind the slowdown. An example of such overlaying is shown in the figure on the left, where the queue lengths of different components are shown on the same graph.

Some Critical Questions

The paper claims that existing monitors have too much overhead and thus cannot capture all requests and find the bottlenecks, however event mScopeMonitor is a simple request tracer that is no different from many others, so it is not clear why it should perform better.

MilliScope collects lots of performance monitoring data in different logs and then brings the data to a centralized location for storage and processing. Authors never mentioned in the paper how they address clock uncertainty between all these different logs. Do they require clock sync, such as NTP? Even though the event monitors capture causal relationship within the same request, resource monitors seem to rely on physical time. The authors claim that milliScope can detect and help explain bottlenecks as short as 10 ms, but what will happen if the resource monitors are skewed more than 10 ms? How accurate is milliScope going to be? In fact, both case studies in the paper worked with the bottlenecks of hundreds of milliseconds or more.

And finally, what can we do once we detected a very short bottleneck? Since VSBs are so short-lived, there may not be any time to react to their presence. Maybe a next step would be to look for precursors to the bottlenecks, so we can rebalance the system or prepare it in some other way for an incoming performance hiccup?

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: “Slicer: Auto-Sharding for Datacenter Applications”

One of the questions engineers of large distributed system must answer is “where to compute”. This is a big and important question, as we do not want to send a request originating in the US to some server in Australia. It simply makes no sense to incur the communication overhead if there are resources available closer. But physical affinity is not the only requirement to answer the question. Just as we would not make an Olympic swimming competition in a desert simply because a desert is close, we would not send a Russian language speech recognition request to the servers with English language model. In other words, when deciding on the placement of a computation, we need to find the closest place with enough resources of the right type.

Slicer is a Google’s answer to this “where” problem. It continuously tries to find the best possible place to perform the computations in the presence of dynamically changing workload. Many applications within Google, such as speech recognition, various caches and DNS service use Slicer to partition and balance their workload among a set of tasks, or application servers (containers). Computation placement is not on the application critical path, and it is somewhat static: once it has been made, we can use it for some time until the conditions change. Think of finding a swimming pool for our Olympics competition ahead of the actual races and sticking with the same pool until the games are over.

Slicer sticks with the same pool of resources as well, as it directs similar requests to the same set of tasks. Each application using Slicer needs to produce a key associated with the request and Slicer hashes it into a 63-bit internal key, which is then used to decide on the placement of the computation. This makes it possible for requests with the same key to be processed by the same set of tasks, catering to locality of reference.

Clerk library provides a client interface to pull and learn which keys are assigned to which tasks. With Clerk, clients can learn the assignment and communicate directly with tasks. Google’s RPC service Stuby is also integrated with Slicer and can provide Clerk’s functionality, so many applications do not even need to use Clerk. Application servers use a similar library, called Slicelet, to learn which keys that particular task is expected to process, allowing tasks to adapt for the change in the requests they serve.

Slicer service is divided in two major parts: Assigners and Distributors. Assigners map keys to tasks, while Distributors retrieve these maps from Assigners and deliver them to the Clerks and Slicelets upon request. More than one Assigner exists globally, and any assigner can generate the key assignment for any application, however, in most cases only one assigner is considered to be preferred. Sometimes more than one Assigner can act as preferred for short duration, but Assigners eventually converge to the same assignment through the optimistically-consistent storage. Slicer also has a mode with strongly consistent assignment, however it has not been tested on production at the time of paper publication. Slicer also packs a variety of backup mechanisms to make sure clients can still make the requests despite Assigner or Distributor failures. In case of miss-assignment (~0.02% of requests), for example due to failed tasks, client will need to try the request again with the new assignment obtained from Slicer.

Assigners perform load balancing for the requests by changing how many tasks are assigned to each key. Balancing is done taking into account a usage rate of different keys. As the load changes, Slicer updates it’s mapping by taking some servers off the lesser used keys and putting them for the use by “heavy” keys. Load balancing results in lesser load on the “hottest” tasks of an application (median job’s hottest task is 63% less loaded), compared to static request distribution. At the same time, Slicer reacts to shifting workload patterns in the matter of minutes.

Slicer hottest task load.
Hottest task load under Static sharding and Slicer. Slicer gets 63% lower hottest task load for a median application.

Slicer seems to be playing a big role in deciding where to process requests at Google, at least for certain applications. It provides load balancing and ensures the requests get to servers or tasks that are both prepared to accept them and have capacity to do so.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.