Tag Archives: dataflow

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: Incremental, Iterative Processing with Timely Dataflow

This paper describes Naiad distributed computation system. Naiad uses dataflow model to represent the computations, but it aims to be a general dataflow framework in contrast to other specialized approaches such as TensorFlow. Similarly to other dataflow systems, the computations are represented as graphs, where vertices represent data and operations and edges carry the data between nodes.

Naiad was designed as the generic framework to support iterative and incremental computations with the dataflow model. We can think of an iterative computation as some function Op is executed repeatedly. Such iteration function can be looped on its output until there are no changes between the input and the output and the function converges to a fixed point.

Incremental computations are a bit more general then the iterative. In incremental processing, we start with initial input A0 and produce some output B0. At some later point, we have a change δA1 to the original input A0, such that we can have new input A1 = A0 + δA1. Incremental model produces an incremental update to the output, so δB1 = Op(δA1) and B1 = δB1 + B0. Note that incremental model only needs to have previous state (i.e. A0 and B0) to compute the next state, however we can extend it to have all output differences:f1. Incremental computations can be adopted for iterative algorithms where each iteration produces the difference output and next iteration operates only on that difference and not the full input. However with basic incremental computation approach it becomes impossible to do iterative operations under the changing or streaming input, as now we need to keep track not only of the iteration number (and differences between iterations), but also on the version of the input (and differences of the input).

Differential computation model overcomes the limitation of basic Iterative and Incremental approaches, by keeping all the differences δB and δA, and not just the previous state. In addition, a two-component timestamp is now used, where one component keeps track of the input version and the second value is responsible for the iteration number. Such timestamping for differences complicates the computation, as the timestamps no longer have a total order. In other words, sometimes it is not possible to tell whether one timestamp happened before another. However, the new timestamp system has a partial order for which f2. And under this partial order it is still possible to sum the differences together:f3

With differential model, we can calculate not only the end result of an operation when the new input comes in, but also any intermediate δBt. A lot of the power of differential dataflow lies with the differential operator that must produces the differences that can be summed with the equation above.

The timestamp plays a crucial role in tracking execution progress. Naiad’s communication methods Send and OnRecv can be triggered multiple time for the same variable, making it necessary to have a mechanism capable of notifying other nodes when certain data has been sent in full.  This notification triggers when all messages at or before a particular timestamp have been sent. Upon receiving a notification on a node, OnNotify(t) method is called, allowing the algorithm to react. Dataflow model complicates the notification mechanism, since the timestamps no longer have a total order, however the partial-order we have established earlier along with some dataflow graph restrictions allow Naiad to keep the effective notification system that does not break its guarantees even under nested loops and continuous input updates.

About Google’s Dataflow Model

In this post I am trying to understand the Google’s Dataflow Model, a data management and manipulation framework used for dealing with unbounded and unordered datasets. A lot of the data is being constantly produced today and has no “maximum size”, in other words the amount of such data is constantly increasing, and therefore modern computing system must have a way of dealing with constant influx of information in an efficient way. In addition, many applications may misalign chronological data out of sequence due to the various reasons, leading to sequential data coming into the information processing system out of sequence.  A data processing model, described in “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive Scale, Unbounded, Out-of-Order Data Processing” paper, attempts to provide an efficient framework for handling such ever-growing and potentially permuted data.

As has been previously stated, the Dataflow model operates on unbounded data, in other words the dataset used in the system build with such framework is infinite and grows over time. The event occurrence and processing time becomes an important factor for system like that, since individual datapoints tend to have a chronological order to them. In addition, a lot of real life data can originate at a particular time, but be delayed for processing by some variable time, causing not just different event occurrence and processing time, but also the possibility of different chronology of events as they occur and being processed.  Below I briefly describe the model and talk about its usage based on the examples in the paper.

The Model

The model operates on two basic primitive actions:

  • ParDo is responsible for parallel processing of the events
  • GroupByKey is doing grouping of the intermediate results of the ParDo function.

These actions are nothing new; they operate on the key-value pairs and allow for distributing the computation to multiple machines and grouping and aggregating the results after the computation is done.

Window based data processing deserves more attention as it allows to work with data in slices instead of bulk processing of everything at once. Once again, handling input data in blocks using windowing approach is very common. Because the Dataflow model assumes that the incoming data can be unordered, the window based approach needs to be able to update the computations performed while processing preceding data slices.  Additionally, the model supports unaligned windows, or windows that may differ in terms of size and start or end time relative to other her windows. Such unaligned windows can be used to represent user sessions, or a cluster of user actions, in many of google applications.

The model operates on the data with two distinct timestamps, event time and processing time, and it is possible to partition the dataset into windows based on any of these time metrics. Unfortunately, when data is windowed, the system cannot ensure that it has collected and processed all events belonging to the window. Instead dataflow model provides a guarantee that eventually, if we process all data (which is impossible, since it is assumed the data is unbounded), the window will have all correct events associated with that window. A heuristic time watermark is used to estimate when the system expects to see events arriving to the system for processing depending on the event time. Once the watermark time is reached, window is processed. Obviously, such heuristic is not enough to guarantee the correctness of each window, so a trigger mechanism is utilized to issue an update to the window upon the arrival of new data after the window has already been computed. Essentially, a trigger retracts old window state, calculates a new state and signals that the updated window is now available. For example, event #9 in Figure 1 is arriving very late for processing, way past the watermark time (dark dashed line), so in many use configurations, event #9 will invoke a trigger and cause a corresponding window state to update.

The system also supports streaming data, which can be dynamically placed into the unaligned windows as more data comes in, with a window becoming available when a systems watermark suggests no more events for the window are expected to come in. The trigger mechanism is used to update the window state incase more data become available in each window.

Examples

Bellow, I will illustrate some of the examples provided in the paper. For all examples, the system computes the sum of the values of the events. Figure 1 shows the input events in relation to their event time and processing time.

data

Figure 1. Sample events

The most basic example is batch processing the entire dataset. It is illustrated on the figure 2. As can be seen, the entire dataset is viewed as one window processed at once.

batch

Figure 2. Bulk batch processing

It becomes more interesting when the data is partitioned into windows as it arrives to the system for processing. In the example on Figure 3, incoming data is broken into non-overlapping windows of 1 minutes, with value at each window accumulating from the previous one. Note that the watermarks are not used in this example, as the window is emitted for processing at fixed interval.

global_win_1min

Figure 3. Processing in non-overlapping accumulating windows

Next example makes use of fixed windows with data partitioned by the event time. At first the system does not see the event #9, so the first window is bounded by the maximum watermark time estimated for the window (shown in darker shade of gray). As soon as the processing time reaches the watermark for the window, the data is processed and the windows state is computed. Later when the event #9 comes to the system, the trigger is invoked and a window state is recalculated, as seen in the light shade of gray for the 12:01 window.

fixed_win_streaming

Figure 4. Fixed windows with streaming data

More sophisticated examples deal with unaligned windows to computed users sessions, but the basic principle stays the same with the exception of the fact that unaligned windows can change their size both in event time and processing time dimensions as more data becomes available.

Some Thoughts

According to the authors, the session use case is the main driving force behind developing the dataflow model. All other examples discussed in the paper are byproducts of the system developed to satisfy the session computation by google, which may not be a very good sign. Clearly, authors thought about other potential uses, but it is not clear how well their system performs under other usage scenarios compared to the existing alternatives.

Triggers provide a mechanism to keep the system in the most up-to-date state as new out of order events arrive for processing. Unfortunately, triggers seem like a fairly expensive operation, especially in some usage scenarios where previous window values must be retracted, new data added and window recomputed for every late event incoming to the system.  Authors do not provide a performance measure for the system as a whole and triggers in particular, but invoking a trigger for every late event may be a potential speed issue for highly out-of-sequence data.

Authors also claim that the model is a highly customizable solution in terms of performance, because of the ability to batch process or stream process incoming data. No real performance measure is provided in the paper, so the reader is left to guess about the degree of such flexibility.