Tag Archives: dataflow

Reading Group. Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo

In the 68th reading group session, we discussed scheduling in dataflow-like systems with Cameo. The paper, titled “Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo,” appeared at NSDI’21.

This paper discusses some scheduling issues in data processing pipelines. When a system answers a query, it breaks the query into several steps or operators, such as groupBys, aggregations, and various other processing steps. Each of these operators executes on some underlying serverless actor (in this work, the underlying platform is Orleans), so naturally, the operators must be scheduled to run on some actors when the actors become available. 

Scheduling is easy when we always have idle actors/workers that can pick up news tasks. But this can lead to overprovisioning of resources and becomes expensive for all but the most latency-critical applications. As such, resource-shared environments that allow processing multiple queries from different tenants are more cost-efficient and practical. Of course, the problem with shared environments is keeping all tenants happy and isolate them from each other. This means that scheduling needs to be aware of SLAs a system must provide to tenants and try not to blow past the deadline. Cameo is a dataflow processing system that aims for fine-grained scheduling of tasks in the system to make sure all queries, consisting of many tasks, finish within some deadline or SLA guarantee. 

Keeping a deadline is tricky though. One complication arises from queries that require more than one operation to compute. Some of these query operations may be dependant on each other and need to happen in sequence. As a result, the query deadline must be broken down into sub-deadlines for all the sequential execution steps. Instead of enforcing the execution deadlines, Came enforces a start deadline for each task. This deadline is the latest time a task must begin to compute, such that it will not impact the downstream tasks and overall query deadline. 

Things can get more complicated from here. If one step takes longer to compute than anticipated, then the start deadlines of downstream tasks/steps may be violated. The system, however, tries to adjust/reprioritize the consecutive steps to accelerate their execution and hopefully still meet the overall query deadline/SLA. But there is more. See, if the scheduler made a mistake once and misjudged the execution time of a task, this can happen for the same query again. To counter this problem, Cameo propagates the task execution information to the scheduler so that it can take better actions next time. This feedback mechanism allows the scheduler to learn from past executions of a query/task and improve in the future.

Now about the scheduler. It has two major components – the scheduling mechanism and the scheduling policy. The scheduling mechanism executes the scheduling policies, and the two parts are rather separate. Such separation allows the scheduling mechanism to take tasks with high priority (sooner start deadline) regardless of how this priority was computed by the policy. The scheduling policy component is pluggable and can be changed for different applications, as long as it creates the priorities that the execution component understands. This part must follow Cameo API to allow rescheduling of downstream/consecutive tasks and learning the stats of step execution to adjust in the future.

The authors claim significant latency improvements of up to 4.6 times when using Cameo. 

As always, we had a presentation in our reading group. The video presentation by Ajit Yagaty is available on YouTube:

Discussion

1) Spark Streaming. The scenarios presented in the paper operate on dynamic data sets that continuously ingest new data. This is similar to other streaming dataflow solutions, such as Spark Streaming, for example. The paper does not provide a direct comparison in evaluation but claims that Cameo has a much more fine-grained scheduling at the per-task level.

2) Fault Tolerance. One natural question in the reading group was around fault tolerance. The paper does not touch on this topic at all. Since the system is built using the Orleans framework, we suspect that most fault tolerance is taken care of by the framework. 

3) Spiky Workloads. The feedback mechanism used to learn how long tasks may take and adjust the scheduling accordingly works great in a streaming system when we have a steady inflow of data and the same queries that process this new data. Adding a new query will likely create some problems (at least for that query) until the system learns the stats of all the tasks this query uses. Similarly, having a big spike in data ingestion can cause some mayhem. If a query operates on the premise that some task needs to process events/messages, but receives 10n, its execution time may become too large and cause the downstream deadlines to violate. And such momentary spikes in a steady flow of new data may happen rather often — all that is needed is some transient network glitch that delayed some messages causing a spikey delivery moments later. 

4) Query Optimizers? We are not query optimization experts here in the reding group, but we wonder if using some query optimization techniques can help here. So instead of learning on historical stats to predict how long different steps of a query may take, can we be smarter and optimize based on the actual data at hand? A simple and naive thing to do for a previous example in (3) is to account for data cardinalities when computing the start deadlines so that a spike in data ingestion can be noted and dealt with immediately. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.

About Google’s Dataflow Model

In this post I am trying to understand the Google’s Dataflow Model, a data management and manipulation framework used for dealing with unbounded and unordered datasets. A lot of the data is being constantly produced today and has no “maximum size”, in other words the amount of such data is constantly increasing, and therefore modern computing system must have a way of dealing with constant influx of information in an efficient way. In addition, many applications may misalign chronological data out of sequence due to the various reasons, leading to sequential data coming into the information processing system out of sequence.  A data processing model, described in “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive Scale, Unbounded, Out-of-Order Data Processing” paper, attempts to provide an efficient framework for handling such ever-growing and potentially permuted data.

As has been previously stated, the Dataflow model operates on unbounded data, in other words the dataset used in the system build with such framework is infinite and grows over time. The event occurrence and processing time becomes an important factor for system like that, since individual datapoints tend to have a chronological order to them. In addition, a lot of real life data can originate at a particular time, but be delayed for processing by some variable time, causing not just different event occurrence and processing time, but also the possibility of different chronology of events as they occur and being processed.  Below I briefly describe the model and talk about its usage based on the examples in the paper.

The Model

The model operates on two basic primitive actions:

  • ParDo is responsible for parallel processing of the events
  • GroupByKey is doing grouping of the intermediate results of the ParDo function.

These actions are nothing new; they operate on the key-value pairs and allow for distributing the computation to multiple machines and grouping and aggregating the results after the computation is done.

Window based data processing deserves more attention as it allows to work with data in slices instead of bulk processing of everything at once. Once again, handling input data in blocks using windowing approach is very common. Because the Dataflow model assumes that the incoming data can be unordered, the window based approach needs to be able to update the computations performed while processing preceding data slices.  Additionally, the model supports unaligned windows, or windows that may differ in terms of size and start or end time relative to other her windows. Such unaligned windows can be used to represent user sessions, or a cluster of user actions, in many of google applications.

The model operates on the data with two distinct timestamps, event time and processing time, and it is possible to partition the dataset into windows based on any of these time metrics. Unfortunately, when data is windowed, the system cannot ensure that it has collected and processed all events belonging to the window. Instead dataflow model provides a guarantee that eventually, if we process all data (which is impossible, since it is assumed the data is unbounded), the window will have all correct events associated with that window. A heuristic time watermark is used to estimate when the system expects to see events arriving to the system for processing depending on the event time. Once the watermark time is reached, window is processed. Obviously, such heuristic is not enough to guarantee the correctness of each window, so a trigger mechanism is utilized to issue an update to the window upon the arrival of new data after the window has already been computed. Essentially, a trigger retracts old window state, calculates a new state and signals that the updated window is now available. For example, event #9 in Figure 1 is arriving very late for processing, way past the watermark time (dark dashed line), so in many use configurations, event #9 will invoke a trigger and cause a corresponding window state to update.

The system also supports streaming data, which can be dynamically placed into the unaligned windows as more data comes in, with a window becoming available when a systems watermark suggests no more events for the window are expected to come in. The trigger mechanism is used to update the window state incase more data become available in each window.

Examples

Bellow, I will illustrate some of the examples provided in the paper. For all examples, the system computes the sum of the values of the events. Figure 1 shows the input events in relation to their event time and processing time.

data

Figure 1. Sample events

The most basic example is batch processing the entire dataset. It is illustrated on the figure 2. As can be seen, the entire dataset is viewed as one window processed at once.

batch

Figure 2. Bulk batch processing

It becomes more interesting when the data is partitioned into windows as it arrives to the system for processing. In the example on Figure 3, incoming data is broken into non-overlapping windows of 1 minutes, with value at each window accumulating from the previous one. Note that the watermarks are not used in this example, as the window is emitted for processing at fixed interval.

global_win_1min

Figure 3. Processing in non-overlapping accumulating windows

Next example makes use of fixed windows with data partitioned by the event time. At first the system does not see the event #9, so the first window is bounded by the maximum watermark time estimated for the window (shown in darker shade of gray). As soon as the processing time reaches the watermark for the window, the data is processed and the windows state is computed. Later when the event #9 comes to the system, the trigger is invoked and a window state is recalculated, as seen in the light shade of gray for the 12:01 window.

fixed_win_streaming

Figure 4. Fixed windows with streaming data

More sophisticated examples deal with unaligned windows to computed users sessions, but the basic principle stays the same with the exception of the fact that unaligned windows can change their size both in event time and processing time dimensions as more data becomes available.

Some Thoughts

According to the authors, the session use case is the main driving force behind developing the dataflow model. All other examples discussed in the paper are byproducts of the system developed to satisfy the session computation by google, which may not be a very good sign. Clearly, authors thought about other potential uses, but it is not clear how well their system performs under other usage scenarios compared to the existing alternatives.

Triggers provide a mechanism to keep the system in the most up-to-date state as new out of order events arrive for processing. Unfortunately, triggers seem like a fairly expensive operation, especially in some usage scenarios where previous window values must be retracted, new data added and window recomputed for every late event incoming to the system.  Authors do not provide a performance measure for the system as a whole and triggers in particular, but invoking a trigger for every late event may be a potential speed issue for highly out-of-sequence data.

Authors also claim that the model is a highly customizable solution in terms of performance, because of the ability to batch process or stream process incoming data. No real performance measure is provided in the paper, so the reader is left to guess about the degree of such flexibility.