About Google’s Dataflow Model

In this post I am trying to understand the Google’s Dataflow Model, a data management and manipulation framework used for dealing with unbounded and unordered datasets. A lot of the data is being constantly produced today and has no “maximum size”, in other words the amount of such data is constantly increasing, and therefore modern computing system must have a way of dealing with constant influx of information in an efficient way. In addition, many applications may misalign chronological data out of sequence due to the various reasons, leading to sequential data coming into the information processing system out of sequence.  A data processing model, described in “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive Scale, Unbounded, Out-of-Order Data Processing” paper, attempts to provide an efficient framework for handling such ever-growing and potentially permuted data.

As has been previously stated, the Dataflow model operates on unbounded data, in other words the dataset used in the system build with such framework is infinite and grows over time. The event occurrence and processing time becomes an important factor for system like that, since individual datapoints tend to have a chronological order to them. In addition, a lot of real life data can originate at a particular time, but be delayed for processing by some variable time, causing not just different event occurrence and processing time, but also the possibility of different chronology of events as they occur and being processed.  Below I briefly describe the model and talk about its usage based on the examples in the paper.

The Model

The model operates on two basic primitive actions:

  • ParDo is responsible for parallel processing of the events
  • GroupByKey is doing grouping of the intermediate results of the ParDo function.

These actions are nothing new; they operate on the key-value pairs and allow for distributing the computation to multiple machines and grouping and aggregating the results after the computation is done.

Window based data processing deserves more attention as it allows to work with data in slices instead of bulk processing of everything at once. Once again, handling input data in blocks using windowing approach is very common. Because the Dataflow model assumes that the incoming data can be unordered, the window based approach needs to be able to update the computations performed while processing preceding data slices.  Additionally, the model supports unaligned windows, or windows that may differ in terms of size and start or end time relative to other her windows. Such unaligned windows can be used to represent user sessions, or a cluster of user actions, in many of google applications.

The model operates on the data with two distinct timestamps, event time and processing time, and it is possible to partition the dataset into windows based on any of these time metrics. Unfortunately, when data is windowed, the system cannot ensure that it has collected and processed all events belonging to the window. Instead dataflow model provides a guarantee that eventually, if we process all data (which is impossible, since it is assumed the data is unbounded), the window will have all correct events associated with that window. A heuristic time watermark is used to estimate when the system expects to see events arriving to the system for processing depending on the event time. Once the watermark time is reached, window is processed. Obviously, such heuristic is not enough to guarantee the correctness of each window, so a trigger mechanism is utilized to issue an update to the window upon the arrival of new data after the window has already been computed. Essentially, a trigger retracts old window state, calculates a new state and signals that the updated window is now available. For example, event #9 in Figure 1 is arriving very late for processing, way past the watermark time (dark dashed line), so in many use configurations, event #9 will invoke a trigger and cause a corresponding window state to update.

The system also supports streaming data, which can be dynamically placed into the unaligned windows as more data comes in, with a window becoming available when a systems watermark suggests no more events for the window are expected to come in. The trigger mechanism is used to update the window state incase more data become available in each window.

Examples

Bellow, I will illustrate some of the examples provided in the paper. For all examples, the system computes the sum of the values of the events. Figure 1 shows the input events in relation to their event time and processing time.

data

Figure 1. Sample events

The most basic example is batch processing the entire dataset. It is illustrated on the figure 2. As can be seen, the entire dataset is viewed as one window processed at once.

batch

Figure 2. Bulk batch processing

It becomes more interesting when the data is partitioned into windows as it arrives to the system for processing. In the example on Figure 3, incoming data is broken into non-overlapping windows of 1 minutes, with value at each window accumulating from the previous one. Note that the watermarks are not used in this example, as the window is emitted for processing at fixed interval.

global_win_1min

Figure 3. Processing in non-overlapping accumulating windows

Next example makes use of fixed windows with data partitioned by the event time. At first the system does not see the event #9, so the first window is bounded by the maximum watermark time estimated for the window (shown in darker shade of gray). As soon as the processing time reaches the watermark for the window, the data is processed and the windows state is computed. Later when the event #9 comes to the system, the trigger is invoked and a window state is recalculated, as seen in the light shade of gray for the 12:01 window.

fixed_win_streaming

Figure 4. Fixed windows with streaming data

More sophisticated examples deal with unaligned windows to computed users sessions, but the basic principle stays the same with the exception of the fact that unaligned windows can change their size both in event time and processing time dimensions as more data becomes available.

Some Thoughts

According to the authors, the session use case is the main driving force behind developing the dataflow model. All other examples discussed in the paper are byproducts of the system developed to satisfy the session computation by google, which may not be a very good sign. Clearly, authors thought about other potential uses, but it is not clear how well their system performs under other usage scenarios compared to the existing alternatives.

Triggers provide a mechanism to keep the system in the most up-to-date state as new out of order events arrive for processing. Unfortunately, triggers seem like a fairly expensive operation, especially in some usage scenarios where previous window values must be retracted, new data added and window recomputed for every late event incoming to the system.  Authors do not provide a performance measure for the system as a whole and triggers in particular, but invoking a trigger for every late event may be a potential speed issue for highly out-of-sequence data.

Authors also claim that the model is a highly customizable solution in terms of performance, because of the ability to batch process or stream process incoming data. No real performance measure is provided in the paper, so the reader is left to guess about the degree of such flexibility.

Understanding ZooKeeper

ZooKeeper is not new, it has been around for quite some time now, yet I feel like not many people who use it in one way or another do understand what it is. ZooKeeper is used by so many distributed systems at the moment that it became a crucial part of the distributed computing, and I feel like it is time for me to learn what it really is and how it works. The service is described in “ZooKeeper: Wait-free coordination for Internet-scale systems” paper.

ZooKeeper is a system used to coordinate processes in distributed applications. According to the authors of the paper, one of the simplest types of coordination is configuration, so it makes absolute sense why ZooKeeper is used to configure so many different distributed applications, but of course, the true strength of ZooKeeper is in much more complicated coordination scenarios. ZooKeeper service itself is running on multiple servers and uses replication to achieve high availability and improve performance.

ZooKeeper does not use locks or any other blocking constructs to achieve coordination, because such blocking primitives can significantly reduce the performance. Instead, it utilizes a non-blocking pipe line architecture with a hierarchical data structure to which clients, or users of ZooKeeper service, can read and write data. Such data structure needs to provide certain guarantees in order to provide coordination capabilities:

  • FIFO client ordering
  • Linearizable write operations.

Data Structure

ZooKeeper’s hierarchical data structure is similar to the file system as each data object, called znode, can be accessed for read or write using a hierarchical path name. The figure below, taken form the paper, illustrates the concept of file-system like hierarchical data model.

f1

Figure 1. Hierarchical data model

There are two types of znodes: Regular and Ephemeral.  Both types store data, but only regular znodes can have children. Another difference between regular and ephemeral znodes is who gets to delete it. Regular znodes can only be deleted by the client, while ephemeral have a shorter lifespan and can be removed by ZooKeeper server when client session terminates. Znodes are not meant for general data storage, although depending on the application some information can be preserved for prolonged time.

A client can manipulate znodes using the following set of commands:

  • create – creates a new znode
  • delete – removes an existing znode
  • exists – checks whether a znode exists at specified path
  • getData – gets data of a
  • setData – writes data to a znode
  • getChildren – lists the children of a znode
  • sync – waits for updates propagation to the server

Each of the operation above can execute in a synchronous or asynchronous mode.  If client executes a command synchronously, it will block and wait for ZooKeeper to respond to the operation. More interesting is the asynchronous execution. In this mode client can issue multiple requests to the ZooKeeper service and perform some other operation while waiting on the response. The responses are guaranteed to arrive to the client in the order of the requests issues.

Actions of checking the existence of a znode or getting data and getting children can have watches associated with them. If a watch is created for an action, in addition to normally completing the requests, ZooKeeper will issue a notification to the client when the data is changed on a znode with a watch.

ZooKeeper API allows client programmer to implement various kinds of more complex coordination primitives of their choice including the blocking constructs, such as locks and barriers.

Guarantees

Zookeeper has two event ordering guarantees that allow it to be used for coordination. First-in-First-out client order guarantee enforces the order in which a client receives the responses to multiple a synchronous calls issued to the service. This guarantee establishes that all requests from a client are executed in the same order as a client issued the requests. ZooKeeper also enforces the Linearizability of write operation, meaning that all state changes are serializable and respect precedence. In other words, all write operations get replicated to all ZooKeeper servers in the same order they have been received from the client.

Implementation

ZooKeeper is built in a distributed manner and runs on multiple servers to provide high availability. ZooKeeper’s data is replicated across all the servers used by the service.

f2

Figure 2. ZooKeeper Service (Source: ZooKeeper: Wait-free coordination for Internet-scale systems)

The figure above is a high level illustration of a ZooKeeper service. Read and Write requests scenarios are depicted in the figure. When a write request comes into the system, it is being processed, forwarded to a single “leader” server, state changes are coordinated among the servers and finally the request data is saved to the replicated database. Read requests need no coordination and since each ZooKeeper server has entire replica of all the data, the information is simply read from the local copy and sent back to the client.

Coordination among ZooKeeper servers is a very important as it ensures each copy has correct state and data read from a database copy on one server is the same as on other servers in the system. The coordination protocol also provides fault tolerance for the system, and in theory the system can continue correct operation as long as the majority of ZooKeeper servers are not faulty. The coordination step between ZooKeeper machines is depicted in the “Atomic broadcast” step in the Figure 2.

Replicated database is another important pieces of ZooKeeper service. Database is in used to store all znodes. It is in-memory and fully replicated, meaning that each properly operating server has a full replica of all ZooKeeper data. Snapshots of the database are periodically written to the disk to facilitate database recover if the server crashes for any reason and is restored later.

With an easy and intuitive API ZooKeepers allows to coordinate large scale distributed system, all while maintaining high availability and fault tolerance. In this little summary I tried to depict the basics of the service, without going into details on API or how coordination between ZooKeeper servers happen, as these are immense topics on their own.

Graph Processing at Facebook Scale

I will start with a little note on large scale graph processing, as described in the paper “One Trillion Edges: Graph Processing at Facebook Scale”.

Graph processing tasks are very common in analyzing various kinds of data, such as network topology of interconnection of people. Social media imposes challenges for such systems and algorithms due to the sheer amount of data. For instance, Facebook has more than 1.39 billion registered users and each users can represented by a vertex in a graph, making the graph consist of billions or even a trillion edges. This explains Facebook’s high interest in large scale graph processing systems.

Researchers at Facebook investigated a number of existing frameworks and systems for graph processing on the large scale and realized that none of the existing approaches were capable of handling the graphs of the Facebook scale. Thus, they decided to take action and develop a system adequate for the needs of the company. The result of their work was a significant contribution to the Apache Giraph project, greatly improving its performance and scalability.

Apache Giraph was chosen as the basis of Facebook’s graph processing system for a number of reasons:

  • Good initial performance compared to the competing products
  • Giraph is written in Java and facilitates the use of existing MapReduce infrastructure
  • Ease of debugging due to the underlying graph processing mode used

Facebook introduced many significant improvements to the Apache Giraph project both in terms of functionality and performance. Among the changes are the ability to use multiple sources for vertex and edge data, more fine-grained parallelism, memory utilization optimizations, addition of a more refined worker phases and introduction of centralized master computation.

Ability to use multiple sources for the input data is important for many large scale users, as the data required for a graph may originate from different sources or stored on various hardware and software stacks. Such feature, although not directly affecting the graph computation, can significantly reduce the time needed to load the graph structure by eliminating the need of data aggregation jobs that bring data from multiple sources into one storage infrastructure.

One of the performance improvements was in how the system handles parallelism. Since Apache Giraph is built on top of the MapReduce framework, the only way to provide more parallelism to the computation in early versions of the system was by increasing the number of worker nodes. Facebook changed Giraph  to allow more fine grained parallelism on the work node level by allowing each worker execute tasks with  multiple threads thus better utilizing multiple CPUs available on the worker machines. Such model introduces a second level of parallelism, so in addition to running multiple workers, each workers can handle its own parallel execution. Although such model allows for more fine-grained tune up of the system and improvements in hardware utilization, it reduces the overall maintainability of the system and makes development and debugging more complicated due to the existence of two distinct parallel execution mechanisms.

Apache Giraph is based on the Bulk Synchronous Processing (BSP) model of parallel computations. Although the model is widely used in many applications, Facebook introduced many extensions to it in the Apache Giraph Project. In particular, the addition of additional compute stages allows for a better code management and overall structure and allows for better optimization of the tasks executing in the system. Master computation enables users to perform certain centralized computation in predefined time intervals. Such master computation is useful when a certain piece of code needs to execute only once and does not need to run on the worker nodes, since master can communicate the results to the workers. In addition, master node computation allows for some synchronization between the nodes, because it relies on the clock of only the master machine to run. As such it allows to partially address, the clock skew problem in worker machines, although such synchronization is still very coarse.

Performance of the Apache Giraph was tested against Apache Hive tasks in the PageRank, label propagation and computing friends of friends score running on the same infrastructure. All of these algorithms showed significant improvements in CPU time and elapsed time as perceived by user when executed using on the Apache Giraph stack compared to Hive.  Unfortunately, no comparison with other specialized graph processing systems was provided, so it is not clear how it fares against the major competition. But the fact that a PageRank algorithm was able to run over one trillion edges in less than 3 minutes per iteration on 200 machines sounds impressive.