One Page Summary

One Page Summary. Photons: Lambdas on a diet

Recently, to prepare for a class I teach this semester, I went through the “Photons: Lambdas on a diet” SoCC’20 paper by Vojislav Dukic, Rodrigo Bruno, Ankit Singla, Gustavo Alonso. This is a very well-written paper with a ton of educational value for people like me who are only vaguely familiar with serverless space!

The authors try to solve some deficiencies in serverless runtime. See, each function invocation is isolated from all other functions by running in separate containers. Such an isolated approach has lots of benefits, ranging from security to resource isolation. But there are downsides as well — each invocation of the same function needs its own container to run, so if there are no available idle containers from previous runs, a new one needs to be created, leading to cold starts. A “cold” function takes more time to run, as it needs to be deployed in a new container and complied/loaded up to the execution environment. In the case of many popular JIT-compiled languages (i.e., Java), this also means that it initially runs from byte-code in an interpreted mode without a bunch of clever compile-time optimizations. The paper states another problem to having all functions invocations requiring their separate containers — the resource wastage. In particular, if we run many invocations of the same function concurrently, we are likely to waste memory on loading up all the dependencies/libraries separately in each container. The authors also mention that some function invocations, for instance, functions for some ML or data processing workloads, may operate from the same initial dataset. Obviously, this dataset must be loaded to each function container separately.

The paper proposes a solution, called Photons, to address the cold-start issues and resource wastage in workloads with many concurrent invocations of the same functions. The concept of a Photon describes a function executed in a container shared with other function invocations of the same type. The premise here is to forgo some isolation and allow multiple invocations of the same function to share the container. As a result, the number of cold starts can be reduced, and resources, like RAM, can be better utilized by having libraries and common data loaded only once for many concurrent function calls.

The lack of isolation between the function invocations, however, creates a few problems. The paper solves some of them, but it also passes quite a few of these problems off to the function developers. One big problem passed to the developers is ensuring that different invocations of the same function consume a predictable amount of resources. This is needed for a couple of reasons. First, there are scheduling needs, as the system needs to predict machine and container resource usage to determine which containers can be expanded to take on more function calls or which machines can handle new containers. Second, the lack of container-level isolation means that a misbehaved function instance can grab too many resources and starve other concurrent functions in the same container.

Another big problem is memory isolation and memory sharing between concurrent functions invocations. A significant part of the Photons platform deals with these problems. On memory isolation, things are easy when we only work with class variables. Each object created from the class will have a separate copy. However, some class variables may be static, and what is even worse, they can be mutable, allowing concurrent executions of the code to modify these static fields. Photons address this problem by transforming static variables into a map, where a key is a photonID (i.e., a unique invocation id of a function). Naturally, all reads and writes to a static variable change to map puts and gets. There are a bit more nuances with statics, and the paper covers them in greater detail. For sharing state, Photons runtime maintains a KV-store exposed to all Photons/function invocation in the container. One major downside of having a KV-store is managing concurrent access to it, and the systems leave it up to the developers to code coordination to this shared store. Aside from shared KV-store, functions also share the file system for temporary storage.

One Page Summary. Gryff: Unifying Consensus and Shared Registers

This paper by Matthew Burke, Audrey Cheng, and Wyatt Lloyd appeared in NSDI 2020 and explores an interesting idea of a hybrid replication protocol. The premise is very simple – we can take one protocol that solves a part of the problem well, and marry it with another protocol that excels at the second half of the problem. This paper tackles replication in geo-distributed strongly consistent storage systems. The authors argue that consensus, when used in storage systems with predominantly read and write operations, is inefficient and causes high tail latency.

A system presented in the paper, called Gryff, takes advantage of predominantly read/write workloads in storage systems and exposes these two APIs via a multi-writer atomic storage ABD protocol.  ABD operates in two phases both for reads and writes. On writes, ABD’s coordinator retrieves the latest version of the register from all nodes and writes back with a version higher than it has seen. On reads, ABD’s coordinator again retrieves the register, writes the highest version back to the cluster to ensure future reads do not see any previous versions, and only then returns back to the client. The write-back stage, however, can be skipped if a quorum of nodes agrees on the same version/value of the register, allowing for single RTT reads in a happy case.

Unfortunately, ABD, while providing linearizability, is not capable of supporting more sophisticated APIs. Read-modify-write (RMW) is a common pattern in many storage systems to implement transaction-like conditional updates of data. To support RMW, Gryff resorts back to consensus and in particular to Egalitarian Paxos (EPaxos) protocol. Choice of EPaxos allows any node in the cluster to act as the coordinator, so it does not restrict writes to a single node like with many other protocols. The problem of this hybrid approach is then the ordering of operations completed with ABD protocol and RMW operations running under EPaxos. Since EPaxos side of Gryff works only with RMWs, it can only order these operations with respect to other RMW operations, but what we need is a linearizable ordering of RMWs with normal writes and/or reads. To keep the ordering, Gryff uses tuples of ABD’s logical timestamp, process ID and the RMW logical counter, called carstamps. Carstamps connect the ABD part of the system with EPaxos – only ABD can update ABD’s logical clock, and only EPaxos updates RMWs counter.

When we consider the interleaving of writes and RMWs, the write with higher ABD’s logical time supersedes any other write or RMW. This means that we actually do not need to order all RMWs with respect to each other, but only order RMWs that have the same base or ABD’s logical time. EPaxos was modified to allow such partial ordering of commands belonging to different bases, essentially making the protocols to have different dependency graphs for RMWs applied to different ABD states. Another change to EPaxos is the cluster-execute requirement, as the quorum of nodes need to apply the change before it can be returned to the client to make the change visible for subsequent ABD read operations.

gryff_1
So, how does Gryff do with regards to performance? Based on the author’s evaluation, it is doing very well in reducing the (tail) latency of reads. However, I have to point out that the comparison with Multi-Paxos was flawed, at least to some extent. The authors always consider running a full Paxos phase for reads, and do not consider the possibility of reading from a lease-protected leader, eliminating 1 RTT from Paxos read. This will make Paxos minimum latency to be smaller than Gryff’s, while also dramatically reducing the tail latency. Gryff also struggles with write performance, because writes always take 2 RTTs in the ABD algorithm. As far as scalability, authors admit that it cannot push as many requests per second as EPaxos even in its most favorable configuration with just 3 nodes.gryff_2

Can Paxos do better? We believe that our PQR optimization when applied in WAN will cut down most of the reads down to 1 quorum RTT, similar to Gryff. PQR, however, may still occasionally retry the reads if the size of a keyspace is small, however this problem also applies to Gryff when the cluster is larger than 3 nodes.

What about Casandra? Cassandra uses a protocol similar to ABD for its replication, and it also incorporates Paxos to perform compare-and-set transactions, which are one case of RMW operation, so in a sense Gryff appears to be very similar to what Cassandra has been doing for years.

 

One Page Summary. Aegean: Replication beyond the client-server model

One Page Summary.  Aegean: Replication beyond the client-server model

Nested Services
Nested microservices. One service may act as a client for another.

This paper builds o

n a key observation about the operation of complex distributed applications. Namely, microservice style of application rarely follows a simple client-server architecture, where a client makes a request and the server (or servers) respond to a request. Instead, many applications often use a nested approach, where clients communicate with some service, and the service itself acts as a client for one or more other nested services. This nesting often presents some challenges with traditional replication protocols, like primary-backup or Paxos-based RSM replication. For instance, when a service is replicated for durability, it makes it more difficult to preserve correctness of nested requests: in case of service failure, the information on whether the nested request was issued or returned may have been lost without some additional safeguards, making it difficult to track whether nested requests need to be reissued or otherwise recovered. Authors also claim that the existing approaches, like Paxos, suffer from performance penalty when dealing with nested calls, since the replicated service needs to block and wait for nested calls to resolve. I personally do not buy the latter issue too much, as many existing replication solutions, even Paxos-based, try to take advantage or parallelism whenever possible, by either using a pipelined approach or concurrently operating on independent requests or data in different conflict domains.

Aegean Shim
A shim layer in front of service B that collects a majority of (duplicate) requests coming from A before passing these requests to B.

To counter the problems with nested request calls and responses, Aegean proposes to use a shim layer sitting next to each replicated service. When one service creates a nested request to the other service, it will talk to the shim layer instead of the nested service directly. The shim layer runs at each replica of a replicated service and collects the requests coming from the caller service (it assumes each replica of the caller will send a request). The shim passes the request to the nested service replica only upon collecting the majority requests from the caller, ensuring that the caller has sufficiently replicated the nested request. The replica can then process/replicate requests. Similarly, when the nested service generates the response, the shim layer broadcasts the response to all replicas of the caller service, keeping track of caller replicas receiving the responses and resending them as needed. Additionally, to ensure the response durability, every replica of the caller service sends the ack to other replicas, and only acts on the responses from a nested call when it itself receives a majority of such acks (including its own). This ensures that the responses to a nested call have been logged by at least a majority of replicas in a service. All these shim layers and ensuring response durability create a lot more message exchange in the system, which undoubtedly will impact the performance.

Another aspect of the paper deals with speculative execution of some requests, as these also introduce problems in the context of nested microservices, as speculative state may leak and get exposed to other services in the nesting chain. Aegean solves the problem of speculation by using barriers before the speculative state may become visible and resolves the speculation by reset and reply if replicas arrived at a different state.

Aegean Performance Evaluation
Aegean Performance Evaluation

To solve the performance issues with sequential Paxos, Aegean proposes to use pipelined approach, which is definitely not new. For example, our Paxi from a few years back is a pipelined implementation of many consensus protocols. Authors claim that Aegean has decent performance, although I find the evaluation a bit lacking. The main comparison is against sequential (not pipelined) Paxos, and Aegean is doing well in this setting. However, even authors admit that a large portion of the difference is due to the pipelining, raising the question of whether the performance comparison is fair in the first place.

Overall, I enjoyed the problems caused by replication in nested microservice architecture, but I am not sure I am too excited about the solution. The solution is a solid one for sure, but it appears very piecewise, with every piece specifically targeting a sub-problem, so it lacks certain elegance (which is not a bad thing at all for a solid practical approach to a problem). The evaluation is one part that raises the most questions for me, ranging from claims that non-byzantine tolerant Paxos and PBFT have similar throughput, to picking inherently weak baselines for evaluation, like non-pipelined Paxos.

One Page Summary: Ring Paxos

This paper (Ring Paxos: A high-throughput atomic broadcast protocol) has been out for quite some time, but it addresses a problem still relevant in many distributed consensus protocols. Ring Paxos aims to reduce the communication load in the Paxos cluster and provide better scalability. As we have shown in our SIGMOD 2019 paper, communication is a great limiting factor in scalability of Paxos-like protocols.

Ring Paxos reduces communication overheads with a twofold approach. First, it uses ip-multicast to substitute direct node-to-node communication wherever possible with a broadcast type of communication. Second, Ring Paxos overlays a ring topology over (parts) of the Paxos cluster to control the message flow and prevent communication bottlenecks from forming.

Ring Paxos operates very similarly to regular Paxos, with differences being mainly in the communication part of the protocol. One node acts as a designated coordinator that receives proposals from the clients. However, the coordinator must confirm itself as a valid coordinator using the phase-1 of Paxos. To that order, the coordinator uses ip-multicast to send the message with some ballot/round number to all acceptors. This message also contains the ring configuration, including the node designated as the beginning of the ring. The acceptors receive the message and compare the ballot with their knowledge and only accept the node as new coordinator if the received ballot is the highest an acceptor has seen so far. Each acceptor is going to reply independently to the coordinator (not shown in the figure), and with these replies the coordinator will learn whether it has succeeded. Additionally, the coordinator also learns of any unfinished/uncommitted values that must be recovered.

Ring Paxos Protocol. Phase-2
Ring Paxos Protocol. Phase-2

Upon successfully getting a quorum of confirmations, the coordinator moves on to the phase-2 of Paxos, illustrated on the figure on the right. During this phase, the coordinator replicates the commands/log entries to the acceptors. Similarly to the phase-1, the coordinator uses ip-multicast to send this message to the acceptors (message #2 in the figure). The acceptors (and learners) get the value/command to be replicated, but that value is not committed just yet. Along with the value, acceptors also receive the value-id (c-vid), a unique identifier for the value, and they set their working value-id (v-vid) to c-vid. The acceptors do not reply individually to the coordinator. Instead, the first coordinator in the ring sends a reply, containing the c-vid to its successor (message #3 in the figure). The acceptor receiving such reply will compare its v-vid and the c-vid from the reply message. The two value-ids match when the acceptor is not aware of any other value/coordinator is acting concurrently. In this case it forwards the message further down the chain. This chain forwarding in the ring happens until the reply reaches the coordinator (message #4), which sits at the tail of the chain. The protocol terminates the message propagation across the ring when the acceptor’s v-vid and c-vid from the reply message are different, leaving the coordinator to wait for a timeout and retry the protocol from the beginning with a higher ballot. Node failures produce similar outcome, since a failure completely hinders message propagation across the ring. As a result, the new phase-1 of Paxos must include a different ring configuration to try avoiding the failed node. When the coordinator receives the reply from the ring/chain, it sends the commit message to all acceptors and learners with the ip-multicast (message #5). The protocol may be extended for running multiple slots in parallel by ensuring the c-vid and v-vid comparisons happen within the same slot.

Ring Paxos Performance.
Ring Paxos Performance.

The performance of Ring Paxos is better that standard Paxos implementations, such as Libpaxos (uses ip-multicast), and Paxos4sb (unicast). It appears that Ring Paxos is capable of saturating most of the network bandwidth, with only LCR protocol pushing a bit more throughput.

One Page Summary: “PaxosStore: High-availability Storage Made Practical in WeChat”

PaxosStore paper, published in VLDB 2017, describes the large scale, multi-datacenter storage system used in WeChat. As the name may suggest, it uses Paxos to provide storage consistency. The system claims to provide storage for many components of the WeChat application, with 1.5TB of traffic per day and tens of thousands of queries per second during the peak hours.

ps_archPaxosStore relies on Paxos protocol to for consistency and replication within tight geographical regions. The system was designed with a great separation of concerns in mind. At a high level, it has three distinct layers interacting with each other: API layer, consensus layer, and storage.  Separating these out allowed PaxosStore provide most suitable APIs and storage for different tasks and application, while still having the same Paxos-backed consistency and replication.

ps_paxoslogIn a paxos-driven consensus layer,  the system uses a per-object log to keep track of values and paxos-related metadata, such as promise (epoch) and proposal (slot) numbers. Log’s implementation, however, seems to be somewhat decoupled from the core Paxos protocol. Paxos implementation is leaderless, meaning there are no single dedicated leader for each object, and every node can perform writes on any of the objects in the cluster by running prepare and accept phases of Paxos. Naturally, the system tries to perform (most) writes in one round trip instead of two by assuming some write locality. After the first successful write, a node can issue more writes with increasing proposal (slot) numbers. If some other node performs a write, it needs to have higher ballot, preventing the old master from doing quick writes. This is a rather common approach, used in many Paxos variants.

The lack of a single stable leader complicates the reads, since there is no authority that has the most up-to-date state of each object. One solution for reading is to use Paxos protocol directly, however this disrupts locality of write operations by hijacking the ballot to perform a read. Instead, PaxosStore resorts to reading directly from replicas by finding the most recent version of the data and making sure a majority of replicas have that version. This works well for read-heavy workloads, but in some high-contention (or failure?) cases the most-recent version may not have a majority of replicas, and the system falls-back to running Paxos for reading.

ps_miniclusterPaxosStore runs in multiple datacenters, but it is not a full-fledged geo-replicated system, as it only replicates between the datacenters located in the same geographical area. The paper is not clear on how data get assigned to regions and whether objects can migrate between regions in any way. Within each datacenter the system organizes nodes into mini-clusters, with each mini-cluster acting as a Paxos follower. When data is replicated between mini-clusters, only one (some?) nodes in each mini-cluster hold the data. This design aims to improve fault tolerance: with a 2-node mini-cluster, failure of 1 node does not result in the failure of the entire mini-cluster Paxos-follower.

ps_latThe paper somewhat lacks in its evaluation, but PaxosStore seems to handle its goal of multi-datacenter, same-region replication fairly well, achieving sub-10 ms writes.

This paper seems like a good solution for reliable and somewhat localized data-store. The authors do not address data sharding and migration between regions and focus only on the intra-region replication to multiple datacenters, which makes me thing PaxosStore is not really “global”, geo-replicated database.  The fault tolerance is backed by Paxos, mini-clusters and the usage of PaxosLog for data recovery. The evaluation could have been more complete if authors showed scalability limits of their system and provided some details about throughput and datacenter-locality of the workload in the latency experiments.

Here is one page pdf of this summary.

One Page Summary: Flease – Lease Coordination without a Lock Server

This paper talks about a decentralized lease management solution. In the past, many lock/lease services have been centralized, placing a single authority to manage all locks in the system. Google’s Chubby, Apache ZooKeeper, etcd, and others rely on a centralized approach and backed by some flavor of a consensus algorithm for fault-tolerance. According to Flease authors, such centralized approach Flease Groupsmay not be ideal at all times and can create a bottlenecks when coordination is required only within small groups of nodes in the system. Distributed filesystems seem to be candidates for this sort of lock management, as a group of nodes tend to be responsible only for resources sharded to that group. Each such group acts independently and maintains locks for resources assigned to it, making a global lock not necessary.

The implementation of decentralized, sharded, lock management system is rather trivial. Since getting a lock/lease requires nodes to reach the consensus about the lease ownership and duration, a Paxos algorithm will suffice. In fact Flease uses a flavor of Paxos built with a distributed register. Right away we can see why Flease targets systems that are sharded into small non-overlapping groups. Running Paxos over too many nodes will degrade the performance.

Just like other lease system, Flease needs to have synchronized clocks with known max time skew uncertainty ε to control lease expiration. It also places some restrictions on minimum lease duration due to the network characteristics, i.e. lease duration has to be greater than two RTTs and greater than ε. Choosing the max lease duration is important for performance reasons.

Flease ThroughputFlease was evaluated against ZooKeeper, as both are implemented in Java and use the same network IO libraries.The figure shows throughput per (client) machine with Flease (straight line) and ZooKeeper. Flease uses small groups of 3 nodes, each running Paxos, and ZooKeeper runs on 3 nodes as well, with all clients connecting to it for lock management. As expected form this experiment, Flease has greater parallelism. When running 30 clients, Flease runs 10 Paxos machines, while ZooKeeper still operates a single one (3 nodes) with 30 clients connected. It is unclear how quickly the performance of Flease will degrade as the group size increases. In essence, similar if not better results could have been achieved by deploying separate ZooKeepers for each group to allow for the same level of parallelism.

The problem of lock management is important and Flease give a good example of application that can benefit from a less-centralized solution to locks. However, the approach in this paper is as trivial as deploying multiple lock management systems for each of the independent non-overlapping groups of nodes in the system. Flease performance will degrade if group size grows above 3 to 5 nodes. In addition, the algorithm is not suitable when group boundaries must be breached on occasion.

One Page Summary: “milliScope: a Fine-Grained Monitoring Framework for Performance Debugging of n-Tier Web Services”

Authors of the ICDCS2017 milliScope paper attack an interesting monitoring problem for distributed systems: detecting and determining a cause of short-lived events in the system. In particular, they address the issue of identifying very short bottlenecks (VSBs) in distributed web services. VSBs manifest themselves as performance degradation of a small number of requests, however they are intermittent, short-lived and hard to detect with tools that aggregate or sample performance data.

milliscope1MilliScope is a monitoring solution aimed exactly at detecting such short bottlenecks and finding their root-cause. For the detection part, milliScope relies on the Event mScopeMonitors. These monitors are present at every component of the web service and record the timestamps associated with each request entering and leaving a particular component. In total, 4 local timestamps are recorded per component, 2 on the forward pass of the request and 2 on the return. This classical request tracing approach captures causal information about request propagation through the components, and it is sufficient to pinpoint the exact request and the exact component experiencing a slow-down.

milliscope2In order to provide more information about the cause of the bottleneck, milliScope uses Resource mScopeMonitors. Resource monitors use existing performance monitoring tools to log the metrics, such as CPU utilization, memory usage, I/O usage, etc. MilliScope then transforms the performance logs to a common, structured format, adding some extra support information. The data from both the event and resource mScopeMonitors eventually ends up in a warehouse and can be queried by the users. Users can overlay the data from different monitors onto each other, allowing to better identify what component or components were causing the bottleneck and what was the root cause behind the slowdown. An example of such overlaying is shown in the figure on the left, where the queue lengths of different components are shown on the same graph.

Some Critical Questions

The paper claims that existing monitors have too much overhead and thus cannot capture all requests and find the bottlenecks, however event mScopeMonitor is a simple request tracer that is no different from many others, so it is not clear why it should perform better.

MilliScope collects lots of performance monitoring data in different logs and then brings the data to a centralized location for storage and processing. Authors never mentioned in the paper how they address clock uncertainty between all these different logs. Do they require clock sync, such as NTP? Even though the event monitors capture causal relationship within the same request, resource monitors seem to rely on physical time. The authors claim that milliScope can detect and help explain bottlenecks as short as 10 ms, but what will happen if the resource monitors are skewed more than 10 ms? How accurate is milliScope going to be? In fact, both case studies in the paper worked with the bottlenecks of hundreds of milliseconds or more.

And finally, what can we do once we detected a very short bottleneck? Since VSBs are so short-lived, there may not be any time to react to their presence. Maybe a next step would be to look for precursors to the bottlenecks, so we can rebalance the system or prepare it in some other way for an incoming performance hiccup?

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: “Slicer: Auto-Sharding for Datacenter Applications”

One of the questions engineers of large distributed system must answer is “where to compute”. This is a big and important question, as we do not want to send a request originating in the US to some server in Australia. It simply makes no sense to incur the communication overhead if there are resources available closer. But physical affinity is not the only requirement to answer the question. Just as we would not make an Olympic swimming competition in a desert simply because a desert is close, we would not send a Russian language speech recognition request to the servers with English language model. In other words, when deciding on the placement of a computation, we need to find the closest place with enough resources of the right type.

Slicer is a Google’s answer to this “where” problem. It continuously tries to find the best possible place to perform the computations in the presence of dynamically changing workload. Many applications within Google, such as speech recognition, various caches and DNS service use Slicer to partition and balance their workload among a set of tasks, or application servers (containers). Computation placement is not on the application critical path, and it is somewhat static: once it has been made, we can use it for some time until the conditions change. Think of finding a swimming pool for our Olympics competition ahead of the actual races and sticking with the same pool until the games are over.

Slicer sticks with the same pool of resources as well, as it directs similar requests to the same set of tasks. Each application using Slicer needs to produce a key associated with the request and Slicer hashes it into a 63-bit internal key, which is then used to decide on the placement of the computation. This makes it possible for requests with the same key to be processed by the same set of tasks, catering to locality of reference.

Clerk library provides a client interface to pull and learn which keys are assigned to which tasks. With Clerk, clients can learn the assignment and communicate directly with tasks. Google’s RPC service Stuby is also integrated with Slicer and can provide Clerk’s functionality, so many applications do not even need to use Clerk. Application servers use a similar library, called Slicelet, to learn which keys that particular task is expected to process, allowing tasks to adapt for the change in the requests they serve.

Slicer service is divided in two major parts: Assigners and Distributors. Assigners map keys to tasks, while Distributors retrieve these maps from Assigners and deliver them to the Clerks and Slicelets upon request. More than one Assigner exists globally, and any assigner can generate the key assignment for any application, however, in most cases only one assigner is considered to be preferred. Sometimes more than one Assigner can act as preferred for short duration, but Assigners eventually converge to the same assignment through the optimistically-consistent storage. Slicer also has a mode with strongly consistent assignment, however it has not been tested on production at the time of paper publication. Slicer also packs a variety of backup mechanisms to make sure clients can still make the requests despite Assigner or Distributor failures. In case of miss-assignment (~0.02% of requests), for example due to failed tasks, client will need to try the request again with the new assignment obtained from Slicer.

Assigners perform load balancing for the requests by changing how many tasks are assigned to each key. Balancing is done taking into account a usage rate of different keys. As the load changes, Slicer updates it’s mapping by taking some servers off the lesser used keys and putting them for the use by “heavy” keys. Load balancing results in lesser load on the “hottest” tasks of an application (median job’s hottest task is 63% less loaded), compared to static request distribution. At the same time, Slicer reacts to shifting workload patterns in the matter of minutes.

Slicer hottest task load.
Hottest task load under Static sharding and Slicer. Slicer gets 63% lower hottest task load for a median application.

Slicer seems to be playing a big role in deciding where to process requests at Google, at least for certain applications. It provides load balancing and ensures the requests get to servers or tasks that are both prepared to accept them and have capacity to do so.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.