Review – Pivot Tracing: Dynamic Causal Monitoring for Distributed Systems

Debugging can be a nightmare for software engineers, it is even more so in the distributed systems that span many machines in potentially more than one datacenter. Unfortunately, many of the debugging and monitoring techniques for such large system do not differ much from the methods used to debug and monitor simple single-machine software. Logs are still one of the most common way to gain the insights into the operation of the software, and these logs are typically produced my each machine independently, making it next to impossible to find causal relationships between evens happening on different servers. In addition, logs must be installed in advance at development time, and altering the information collected after the system deployment can be problematic and will require additional developer time.

Pivot Tracing tries to address these issues. Pivot Tracing allows to dynamically alter what information is being collected without having to stop the system being monitored. It also introduces a happened-before join operation that allows engineers to correlate events based on their happened-before relations to each other. Despite the ability to dynamically reconfigure the system to collect different information, it still requires expert knowledge of distributed environment being monitored. Before a system can be used, engineers need to define tracepoints, or places in the code of the underlying system where monitoring and logging instrumentation can be dynamically installed by Pivot Tracing.  Engineers also need to define (1) what parameters can be extracted and logged by the system, however defining the parameters system can collect is not limited to pre-launch or configuration stage, these log parameters can be modified at any time during the life-cycle of the system Pivot Tracing monitors.


Pivot Tracing users use a high-level query language to request monitoring/debugging information they need. The query is compiled into an intermediate representation called advice.  Different parameters can be collected at various tracepoints, so the advice carries the instructions to each relevant tracepoint regarding what instrumentation or monitoring needs to be installed at each tracepoint and what information is to be collected and propagated in the system.  The data is collected with the execution path flow of the system, as execution passes through a relevant tracepoint (4) some parameters are collected and send down the execution path in a baggage (5). In addition tracepoints (4) can emit tuples (6) that are being sent to Pivot Tracing front end (7) will become available to the user (8).

A happened-before join, denoted by “->”, is a very powerful tool within Pivot Tracing for capturing causality of events. Let’s look at the following query example:


This query sets the anticipated execution path of the request. At first, a request needs to pass through a ClientProtocol and followed by the tracepoint at incrBytesRead. In the example above, we are only interested in the events that go from ClientProtocols to incrBytesRead, and any other execution paths will not work for this query. Since the query runs in parts along the execution path of the request, Pivot Tracing can easily capture happened-before relationship between events by following the request within the system. Advice compiled from the query has capabilities for evaluating messages coming in the baggage from prior tracepoints to process the happened-before joins. If the tracepoint appears earlier in the execution path, then the events at that tracepoint will happen before the events at the later tracepoint.

But what appoint non-linear execution paths? What if we have segments of code that execute in parallel? The paper does not talk about this in great details, but Pivot Tracing should still work well.  For instance, if two threads are parallel and do not communicate with each other, then the events in this two threads are concurrent, however once the two threads start to communicate, the baggage from earlier tracepoints will be transmitted along these communication channels, allowing Pivot Tracing to carry out happened-before joins as usual. Consider the following example authors provide:


Query A -> B produces a1b2 and a2b2 results, however there is no a1b1, because at the time b1 was running, it had no baggage from the thread running a1, so both a1 and b1 are concurrent.

Pivot Tracing has been implemented in Java and tested against Hadoop software stack. Authors claim to have found a bug in HDFS by using Pivot Tracing, however by the time a bug was found with Pivot Tracing, it has already been reported by others. Nevertheless, it is impressive that the system was able to help find the problem by just executing a few small queries.

The overhead of pivot tracing is fairly small, and mainly consist of packing, unpacking and transmitting tuples from one tracepoint to another.  As such, when no monitoring is required, the system can be left enabled with no queries running resulting in negligible overhead (PivotTracing Enabled row in the table below).


Under a stress test on the HDFS stack, the overhead reached almost 16% for certain operations. It is important to understand that some queries may result in bigger baggage transmitted and more tuples packed and unpacked to the baggage, thus, I think, it is be a good idea to test and optimize queries in staging environment before running it on the production system. This however defeats one of the bigger advantages of Pivot Tracing – its ability to dynamically adjust to different monitoring scenarios.

Authors do not talk much about scalability to system with a larger number of nodes or systems with various level of communication between different nodes. It is also interesting to see how big of a penalty will a WAN deployment incur? After all, the main overhead of the Pivot Tracing is baggage propagation and having to piggyback all that additional data to messages along the communication paths between the nodes can have severe negative effect for systems that are capable to saturate their bandwidth limitations.

Despite Pivot Tracing authors advocating against the traditional logs for debugging, their system is still fundamentally a logging system, albeit a lot more sophisticated. Users can use Pivot Tracing to log only the information they need along with some causal relationship between these log pieces. Despite this, I believe there are still cases when a traditional logging approach can be of more use than Pivot Tracing, namely debugging rare and subtle bugs that can happen only under certain set of conditions. With Pivot Tracing users can install instrumentation after such rare bug has occurred, but there is no guarantee that it will happen again anytime soon, yet the overall system pays the penalty overheads of the monitoring. In this context, traditional logs can provide more immediate benefit, as they allow engineers to look back in time at the system execution.

With the presence of back-in-time snapshot capability, we can revert back to the past states of the system and replay back the changes along with newly installed instrumentation for monitoring, but overheads of this may be enormous for a large scale distributed system. Is there a way around this? Can we look back in time and identify the bugs, data corruption or performance issues without paying a significant performance price?

Review: Implementing Linearizability at Large Scale and Low Latency

In this post I will talk about Implementing Linearizability at Large Scale and Low Latency SOSP 2015 paper.

Linearizability, the strongest form of consistency, can be very important in large scale data storage systems, although many such systems either do not implement linearizability or do not fully expose serializable operation to the clients. The later type of systems can maintain linearizability for internal operations that occur between servers, but do not provide the same consistency to the clients.

The authors of the paper provide a linearizability framework, called RIFL, suitable for use in existing non-linearizable RPC based distributed system. The framework allows to convert existing RPC into linearizable ones in just a few lines of code with minimal impact on the overall performance. The paper only discusses RPC-based systems, since according to the paper, linearizability requires a request-response protocol to operate. I think it may be possible to sue RIFL-like system for message passing approaches as long as receiving each message eventually produces an ack to the sender.


In order to better understand RIFL and how it is beneficial in the data-store system, we need to talk about Linearizability. According to the paper, Linearizable operations appear to happen instantaneously and only once at same point in the execution of a system. It is important to understand that in a real system an operation can take some time to execute and can potential fail midway through its execution. Linearizable system must make it appear to all its clients as if the operation happened right away. The ability to execute operations only once is another important point, as many existing systems retry execution of operation upon failure. Authors say such operations follow at-least-once semantics, whereas linearizable operations have exactly-once semantics.  In order to achieve certain consistency guarantees, many existing systems use idempotent operations which produce the same outcome regardless of how many times such operations have been executed. Authors show an example in which running such operation more than once can break the linearizability after a certain failure.non-linearizable_example

Example of at-least-once semantic breaking linearizability.

In this case we have two clients interacting with a single server. Client B writes 2 to the server but it crashes before the server has a chance to respond. When client A reads the data, it will get the value written by B. Later client A can write a different value, while client B is recovering from the failure. Once client B as back up, it does not know that its previous operation has succeeded, so it retries it and overwrites the later value written by A.  Authors do not mention how likely such example to occur in practice, but given a large scale of the system with thousands or even millions clients, it will be unwise to discount the possibility of such failure. Nothing is mentioned whether any of the existing data-store systems address the issue.


RIFL framework allows the conversion of the system relaying on the at-least-once RPC operations into linearizable exactly-once operations. The main idea behind RIFL is storing the results of the RPC execution, so that in case of a retry of an RPC call the system could have used already known result without having to re-run the procedure. The results of the RPC invocation are stored on the completion records, and each such record is associated with each unique RPC. This ensures the exactly-once operation of the RPCs in the system, but also opens up a number of problems that had to be solved.


High level representation of RIFL logic

In order to operate properly, the system must be able to detect retry calls. In order to make such detection easy, each RPC is assigned a 128-bit ID number consisting of a 64-bit client ID and 64-bit sequence at such client. If an operation is to be retried by the client, it must use the same ID. Before execution of an RPC, the server will check if it is aware of a completion record for such RPC and if it does not exist, RPC continues, but if a completion record is present then the server will return the results stored in the completion record instead of running an RPC.

Migrating completion records is essential in the event of a failure, as the system relies on the presence of such records to make a decision on whether an RPC needs to run. From time to time, data can migrated from one server to another, especially in case of a server crash. The new server must have the completion records available to it after the migration, so each completion records is attached to one of the data-objects being modified by an RPC, so that moving the object will also move all the completion records for RPCs acting on the object. Unfortunately, authors do not explain in detail how the migration is made, as this part is probably left out to the underlying system. It is very likely that completion records also get replicated with the objects they belong to for durability reasons, although no mechanism for such replication is described as well, so it is worth to assume that the completion record replication is left out to the underlying system.

Overtime many completion records are going to be created for each object, increasing the storage requirements and the bandwidth used for replication and migration of objects. In order to improve resource utilization, a garbage collection mechanism for old completion records was devised. In RIFL a completion record can get removed from the system if a client acknowledges that it knows of a successful RPC execution and will not retry it in the future. Such acknowledgments are piggybacked to the new RPC requests and as a result incur minimal overhead. In case of a client failure, no acknowledgement will be sent to the server, causing certain completion records to persist. In order to deal with this problem, RIFL uses lease manager to grant leases to all the clients. In case a client lease is not renewed, all completion records for the client will be purged. It is not clear how a centralized lease system can impact the overall performance of a system implementing RIFL. A time synchronization between the client, lease manager and a server is used to reduce the need to communicate: the server will contact lease manager only when it estimates that the client lease will expire soon. This portion of a lease protocol raises some questions about the reliability of the lease sub-system. What is going to happen if time skew is greater than the server estimates for? If the server time is ahead of lease manager time, server will start issuing more check requests to the lease manager, but if it is lagging behind the lease manager time, than the server may think lease is still good while the client may have already been dead. I think the worst case scenario is that GC does not collect all dead completion records, which may not be of a big immediate problem, but may eventually lead to the excess memory consumption by the server applications.

Transactions with RIFL

Authors implemented a transaction system using RIFL for linearizability on top of RAMCloud, a distributed, in memory key-value datastore. A two-phase commit protocol similar to Sinfonia is used to implement transactions. In the first phase of the protocol, usually called a prepare phase, a set of read, write or delete commands is sent to servers and each server upon receiving prepare determines if it can proceed with the commit. If all servers can commit, then a second phase finalizes the transaction. RIFL makes crash recovery simpler compared to Sinfonia. Since each prepare operation is linearizable, retires of the prepare will not cause and adversary effects. Upon a more serious crash, recovery manager can learn if the results of the prepare operation without the knowledge of the original commit commands, and if all prepares have succeeded, it can finalize the transaction; in case of some prepare failures, transaction is simply aborted.

One important point authors make in the paper is about traditional way of implementing linearizability on datastores and how it differs from their implementation. In the existing system, linearizability is implemented on top of a transaction system and according to the authors this approach creates more cumbersome transaction mechanism. With RIFL, transactions were implemented on top of a linearizability layer, which authors claim is a better approach.



RIFL was implemented and evaluated in RAMCloud. Overall, authors claim only 5% reduction in latency for RIFL linearizable write RPCs compared to the original writes. No significant difference in throughout was observed when using RIFL.


Added overhead of RIFL to the RAMCloud system. Left is latency, right is throughput.

Transaction performance was evaluated with TPC-C benchmark typically used for performance evaluation of Online Transaction Processing (OLTP) systems.  RIFL RAMCloud was compared against H-store database. Both system are in-memory databases but they different significantly in their purpose and typical use cases. As a matter of fact, RIFL RAMCloud solution had to be specifically implemented for TPC-C benchmark. When comparing the two systems, authors found out that RAMCloud with RIFL significantly outperforms H-store in all tests. I am a bit skeptical about these results, at least without more knowledge about how RAMCloud was used to make TPC-C benchmark work with it and whether the implementation of RIFL & RAMCloud interface for TPC-C benchmark was specifically tailored for the tests performed by TPC-C. It may have been a good idea to compare the system against other transaction protocols implemented in RAMCloud, such as the ones based on consensus.

Overall Thoughts

When reading the paper I thought that the idea of caching the results of RPC calls is a very straightforward and simple and I am surprised it has not been exploited before. Yes, store such cache presents a few challenges, mainly in memory management of the overall system, as the cache size can grow large, but as shown in RIFL, these are not very big challenges and can be solved with simple protocols and existing tools, such as ZooKeeper.  Authors claim that implementing transactions on top of linearizability layer is a better and faster approach. Transactions (mini-transactions?) implementation became easier with RIFL, but I am not sure the performance benefit is obvious. On my opinion performance comparison with H-store seems somewhat unfair.

A Few Words about Inconsistent Replication (IR)

Recently I was reading the “Building Consistent Transaction with Inconsistent Replication” paper. In this paper authors use inconsistently replicated state machine, but yet they are capable of creating a consistent transaction system. So what is Inconsistent Replication (IR)?

In the previous posts I summarized Raft and EPaxos. These two algorithms are used to achieve consensus in the distributed system, so for example when we deal with replicated state machines, these algorithms allow each replica to be an exact, consistent copy of each other. So, it is logical to assume that Inconsistent Replication will not produce the same replicas all the time, so our state machines can end up in different states. Why would we want to have a replicated state machine with various copies potentially being in different states? According to the authors of the paper it is faster than consistent replication, yet can still be used in some applications, such as transaction commit. I think the usage of IR will not be as straightforward as using consistent replications, since users of IR must also design their applications in such a way that tolerates the inconsistent state of the nodes.

IR does not guarantee the order in which each command is executed by replicas, thus replicas can reach different states unless the operations are independent of each other.  The figure below illustrates hot this can happen.


Figure 1. Replicas in inconsistent state. (a) Two requests C1 and C2 are being sent to replicas at the same, but the requests reach replicas in different orders. (b) Requests are logged and executed in the order they have been received leading to an inconsistent state.

Since replicas can be in different states, IR cannot guarantee that recovering from failures can preserve the value or the states of each recovered operation. Because of this, IR has two types of guarantees it can provide upon recovering from a failure. If a client does not need to have an ability to recover the value of each operation, it can use IR in an inconsistent operation mode that only guarantees the recovery of only the fact that an operation occurred for up to f failures in a system of 2f+1 replicas. Inconsistent operation mode does not allow to recover the value of the operation. In consensus operation mode, IR can preserve both the command and a result of such command for up to f failures. In this mode, the result of an operation is the result a majority of replicas report for such operation, if such majority exist. Consensus operation may fail if not enough replicas report the same result for the operation, in which case IR protocol retries. Consensus operations also need to have floor(3/2f)+1 replicas agree to be finalized. IR protocol will also retry the operation until it finalizes. It seems like only finalized consensus operation can be recovered with the results of the operation, and not all consensus operation can be finalized since they can timeout after getting the majority but before reaching the consensus on floor(3/2f)+1 replicas.

This notion of retrying the operations until they eventually succeed makes me question whether it is a good solution especially under various kinds of loads.  If the system deals with mostly independent requests it may not be difficult to reach consensus in consensus operation mode, but if there are lot of contention between requests, the system may just be stuck retrying all the operations without doing much of a useful work.

I am not going mention IR recovery mechanism described in the paper, but it is worth noting that during the recovery the entire system blocks and stops responding to new requests. The process is initialized by a recovering replica communicating to all other nodes, and once each node learns that some other replica is recovering from a failure it stops processing the requests until the protocol is finished and normal operation can resume.

EPaxos: Consensus with no leader

In my previous post I talked about Raft consensus algorithm. Raft has a strong leader which may present some problems under certain circumstances, for example in case of leader failure or when deployed over a wide area network (WAN). Egalitarian Paxos, or EPaxos, discards the notion of a leader and allows each node to be a leader for the requests it receives. “There Is More Consensus in Egalitarian Parliaments“ talks about the algorithm in greater details.

According to the designers of the algorithm, they were trying to achieve the following three goals:

  • Good commit latency in WAN
  • Good load balancing
  • Graceful performance decrease due to slow or failing replicas

These goals are achieved by decentralized ordering of commands. Unlike classical Paxos and many Paxos-lie solutions, EPaxos has no central leader, and orders the commands dynamically and only when such ordering is required.  Upon receiving a request, EPaxos replica becomes a request-leader, it then uses fast quorum to establish any dependencies for such request. If no dependencies are found, it can proceed to committing a request, but if the request conflicts with other requests being processed, the order of the request is established taking into account all the dependencies in a manner similar to regular Paxos algorithm.



Figure 1. Sample Message flow in EPaxos. R1-R5 are replicas, obj_A and obj_B are requests. Arrows designate message flow. Figure taken from the “There Is More Consensus in Egalitarian Parliaments” paper.

The figure above illustrates two scenarios. On the left we have two requests coming in to different EPaxos nodes (R1 and R5), and such requests have no dependencies on each other, so both R1 and R5 can proceed to commit right after the fast quorum tells them there are no dependencies. Similarly, on the right side we have two messages arriving from the client to their corresponding replicas, but Message C3 has a dependency on message C4. R1 learns of such dependency through node R3 and must invoke Paxos accept stage of the algorithm to enforce proper ordering of the requests, as such we order C3 only after C4 and commit C3 after C4 has been committed and executed.


Figure 2. Latency vs Throughput. Figure taken from the “There Is More Consensus in Egalitarian Parliaments” paper.


Figure 3. Throughput in case of a replica failure (leader failure for Multi-Paxos). Figure taken from the “There Is More Consensus in Egalitarian Parliaments” paper.

EPaxos demonstrates good performance and stability when compared to other common consensus algorithms. Both figures above are from the EPaxos paper mentioned earlier. It is interesting to see how throughput stays the same for EPaxos in the event of a node failure, while classical Multi-Paxos virtually stops until a new leader can be elected. In Figure 2, the percentage next to the algorithm name designates the conflicting commands, or commands that have dependencies. As can be seen EPaxos generally performs well, even with a high number of conflicting commands, although the difference between 25% and 100% conflicting commands seems small. It is worth noting that based on previous and related work, authors estimate 2% conflicting commands in real world situations.

Consensus with Raft Algorithm

When we talk about consensus in a distributed system, we talk about a system consisting of multiple machines that act as one state machine yet capable of surviving failures of some of the system nodes. Consensus algorithms are designed to enforce all distributed nodes have the same state so that the distributed system can tolerate failures. Paxos family of algorithms are probably the most widely used consensus mechanism currently used in practice, but according to many researcher in the community  it is fairly difficult for implementation and understanding. The authors of Raft claim to have designed an efficient, reliable consensus algorithm that is easy to understand. In fact, the ease of comprehension was at the cornerstone of the algorithm development and influenced many decisions.


In the very basic terms, the need to establish consensus arises when a set of clients send requests to the replicated state machines. Such machines need to keep the same state and same log of state changes.  In case some nodes of such replicated system fail, an uninterrupted operation is still going to be possible with leftover machines. Once the machines recover from the failure, their state can be brought up to date with the rest of the cluster.


Figure 1. Simplified view of distributed system

An oversimplified view of a distributed system is shown in Figure 1. As the clients send requests to the system, at handles the requests, and replicates state and log changes across the cluster to make sure each node has the correct information. It is the job of a consensus algorithm to keep replicated state machines in agreement with each other.


Raft is a fairly new consensus algorithm, and as have been mentioned earlier one of the requirement was the ease of understanding. I will not go into much details describing the algorithm, and will provide only the basics of the approach.

Each server in the Raft system can take one of the three possible roles: leader, follower and candidate. The role each server takes determines how it reacts to the messages and requests received. For example, on leader can receive requests form the client, while the follower nodes must relay any client communication to the leader. Follower servers listen and respond to the leader server. A follower node may become a candidate node when it believes that no leader is available.

Leader Election

The first step in the Raft algorithm is a leader election. Once a server starts up, it enters the follower mode, if no leader is present, the node will timeout and change its role to the candidate, thus starting the election process. In the election process, a candidate sends out requests to other nodes, and if other nodes acknowledge such requests, a candidate becomes a leader. A leader failure can occur while the system is operating, but such case is no different than the startup of the cluster, since the followers will timeout and start the election process.

The concept of a term is used as a logical time for the system. Term is used to count how many leaders has been observed by the system. Term information is used to determine stale server with no up-to-date information about the current state of the system.  For instance, is a node believes it is in term x, while other nodes are in term x+1, then such stale node needs to be updated with correct log and state machine. Term is also important for leader election, as the system needs to make sure no stale node can become the leader.

Log Replication

Consensus algorithms needs to enforce the replication of state change log. In Raft, only a leader can serve client requests, reducing possible requests entry points down to one machine. Once a leader receives a request, it communicates the change to the follower servers and after receiving acknowledgement of such communication from the majority of the followers, the leader applies or commits the request to its state machine and serves the result to the client.  In case some followers do not respond with an acknowledgement, the leader will be trying to update the log of such follower until it receives a successful ack.

Figure 2 show the log entries for a hypothetical cluster of 5 machines. In this scenario, we have some followers who significantly lack behind, but it does not stop the system to advance as long as the majority of servers acknowledge that they received the log entry from the leader. As can be seen, log entries up to log #7 are committed, because the majority of machines accepted that entry.


Figure 2. Log structure in Raft. (Source: “In Search of an understandable Consensus Algorithm

There are a few key safety properties Raft algorithm enforces:

  • Only one leader can be elected at a time.
  • A leader can only add new entries to the log, but not modify the existing ones.
  • When logs have an entry with the same ID and term, such entry and all entries prior to it must contain the same data.
  • No two distinct entries can exist with the same id.

More to Read and Think

I am leaving out a lot of details about the algorithm, but I think the basic gist of it can be understood. Of course the paper provides more information and mentions a few optimizations to the algorithm.

One thing that caught my attention when looking at the algorithm is the need to have a single entry point for all client requests. I may be wrong since it is probable the case for all other algorithms with a strong leader, but it seems like this can be a limited factor, if a leader, who serves as such entry point runs out of compute resources or network bandwidth. Imagine a scenario when a leader is so overloaded with serving clients and communicating with all the follower that it causes network latency to go up and eventually severs the connection entirely. A new leader is then elected and it has to deal with even more client requests, more log replications and eventually fails too…

About Google’s Dataflow Model

In this post I am trying to understand the Google’s Dataflow Model, a data management and manipulation framework used for dealing with unbounded and unordered datasets. A lot of the data is being constantly produced today and has no “maximum size”, in other words the amount of such data is constantly increasing, and therefore modern computing system must have a way of dealing with constant influx of information in an efficient way. In addition, many applications may misalign chronological data out of sequence due to the various reasons, leading to sequential data coming into the information processing system out of sequence.  A data processing model, described in “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive Scale, Unbounded, Out-of-Order Data Processing” paper, attempts to provide an efficient framework for handling such ever-growing and potentially permuted data.

As has been previously stated, the Dataflow model operates on unbounded data, in other words the dataset used in the system build with such framework is infinite and grows over time. The event occurrence and processing time becomes an important factor for system like that, since individual datapoints tend to have a chronological order to them. In addition, a lot of real life data can originate at a particular time, but be delayed for processing by some variable time, causing not just different event occurrence and processing time, but also the possibility of different chronology of events as they occur and being processed.  Below I briefly describe the model and talk about its usage based on the examples in the paper.

The Model

The model operates on two basic primitive actions:

  • ParDo is responsible for parallel processing of the events
  • GroupByKey is doing grouping of the intermediate results of the ParDo function.

These actions are nothing new; they operate on the key-value pairs and allow for distributing the computation to multiple machines and grouping and aggregating the results after the computation is done.

Window based data processing deserves more attention as it allows to work with data in slices instead of bulk processing of everything at once. Once again, handling input data in blocks using windowing approach is very common. Because the Dataflow model assumes that the incoming data can be unordered, the window based approach needs to be able to update the computations performed while processing preceding data slices.  Additionally, the model supports unaligned windows, or windows that may differ in terms of size and start or end time relative to other her windows. Such unaligned windows can be used to represent user sessions, or a cluster of user actions, in many of google applications.

The model operates on the data with two distinct timestamps, event time and processing time, and it is possible to partition the dataset into windows based on any of these time metrics. Unfortunately, when data is windowed, the system cannot ensure that it has collected and processed all events belonging to the window. Instead dataflow model provides a guarantee that eventually, if we process all data (which is impossible, since it is assumed the data is unbounded), the window will have all correct events associated with that window. A heuristic time watermark is used to estimate when the system expects to see events arriving to the system for processing depending on the event time. Once the watermark time is reached, window is processed. Obviously, such heuristic is not enough to guarantee the correctness of each window, so a trigger mechanism is utilized to issue an update to the window upon the arrival of new data after the window has already been computed. Essentially, a trigger retracts old window state, calculates a new state and signals that the updated window is now available. For example, event #9 in Figure 1 is arriving very late for processing, way past the watermark time (dark dashed line), so in many use configurations, event #9 will invoke a trigger and cause a corresponding window state to update.

The system also supports streaming data, which can be dynamically placed into the unaligned windows as more data comes in, with a window becoming available when a systems watermark suggests no more events for the window are expected to come in. The trigger mechanism is used to update the window state incase more data become available in each window.


Bellow, I will illustrate some of the examples provided in the paper. For all examples, the system computes the sum of the values of the events. Figure 1 shows the input events in relation to their event time and processing time.


Figure 1. Sample events

The most basic example is batch processing the entire dataset. It is illustrated on the figure 2. As can be seen, the entire dataset is viewed as one window processed at once.


Figure 2. Bulk batch processing

It becomes more interesting when the data is partitioned into windows as it arrives to the system for processing. In the example on Figure 3, incoming data is broken into non-overlapping windows of 1 minutes, with value at each window accumulating from the previous one. Note that the watermarks are not used in this example, as the window is emitted for processing at fixed interval.


Figure 3. Processing in non-overlapping accumulating windows

Next example makes use of fixed windows with data partitioned by the event time. At first the system does not see the event #9, so the first window is bounded by the maximum watermark time estimated for the window (shown in darker shade of gray). As soon as the processing time reaches the watermark for the window, the data is processed and the windows state is computed. Later when the event #9 comes to the system, the trigger is invoked and a window state is recalculated, as seen in the light shade of gray for the 12:01 window.


Figure 4. Fixed windows with streaming data

More sophisticated examples deal with unaligned windows to computed users sessions, but the basic principle stays the same with the exception of the fact that unaligned windows can change their size both in event time and processing time dimensions as more data becomes available.

Some Thoughts

According to the authors, the session use case is the main driving force behind developing the dataflow model. All other examples discussed in the paper are byproducts of the system developed to satisfy the session computation by google, which may not be a very good sign. Clearly, authors thought about other potential uses, but it is not clear how well their system performs under other usage scenarios compared to the existing alternatives.

Triggers provide a mechanism to keep the system in the most up-to-date state as new out of order events arrive for processing. Unfortunately, triggers seem like a fairly expensive operation, especially in some usage scenarios where previous window values must be retracted, new data added and window recomputed for every late event incoming to the system.  Authors do not provide a performance measure for the system as a whole and triggers in particular, but invoking a trigger for every late event may be a potential speed issue for highly out-of-sequence data.

Authors also claim that the model is a highly customizable solution in terms of performance, because of the ability to batch process or stream process incoming data. No real performance measure is provided in the paper, so the reader is left to guess about the degree of such flexibility.

Understanding ZooKeeper

ZooKeeper is not new, it has been around for quite some time now, yet I feel like not many people who use it in one way or another do understand what it is. ZooKeeper is used by so many distributed systems at the moment that it became a crucial part of the distributed computing, and I feel like it is time for me to learn what it really is and how it works. The service is described in “ZooKeeper: Wait-free coordination for Internet-scale systems” paper.

ZooKeeper is a system used to coordinate processes in distributed applications. According to the authors of the paper, one of the simplest types of coordination is configuration, so it makes absolute sense why ZooKeeper is used to configure so many different distributed applications, but of course, the true strength of ZooKeeper is in much more complicated coordination scenarios. ZooKeeper service itself is running on multiple servers and uses replication to achieve high availability and improve performance.

ZooKeeper does not use locks or any other blocking constructs to achieve coordination, because such blocking primitives can significantly reduce the performance. Instead, it utilizes a non-blocking pipe line architecture with a hierarchical data structure to which clients, or users of ZooKeeper service, can read and write data. Such data structure needs to provide certain guarantees in order to provide coordination capabilities:

  • FIFO client ordering
  • Linearizable write operations.

Data Structure

ZooKeeper’s hierarchical data structure is similar to the file system as each data object, called znode, can be accessed for read or write using a hierarchical path name. The figure below, taken form the paper, illustrates the concept of file-system like hierarchical data model.


Figure 1. Hierarchical data model

There are two types of znodes: Regular and Ephemeral.  Both types store data, but only regular znodes can have children. Another difference between regular and ephemeral znodes is who gets to delete it. Regular znodes can only be deleted by the client, while ephemeral have a shorter lifespan and can be removed by ZooKeeper server when client session terminates. Znodes are not meant for general data storage, although depending on the application some information can be preserved for prolonged time.

A client can manipulate znodes using the following set of commands:

  • create – creates a new znode
  • delete – removes an existing znode
  • exists – checks whether a znode exists at specified path
  • getData – gets data of a
  • setData – writes data to a znode
  • getChildren – lists the children of a znode
  • sync – waits for updates propagation to the server

Each of the operation above can execute in a synchronous or asynchronous mode.  If client executes a command synchronously, it will block and wait for ZooKeeper to respond to the operation. More interesting is the asynchronous execution. In this mode client can issue multiple requests to the ZooKeeper service and perform some other operation while waiting on the response. The responses are guaranteed to arrive to the client in the order of the requests issues.

Actions of checking the existence of a znode or getting data and getting children can have watches associated with them. If a watch is created for an action, in addition to normally completing the requests, ZooKeeper will issue a notification to the client when the data is changed on a znode with a watch.

ZooKeeper API allows client programmer to implement various kinds of more complex coordination primitives of their choice including the blocking constructs, such as locks and barriers.


Zookeeper has two event ordering guarantees that allow it to be used for coordination. First-in-First-out client order guarantee enforces the order in which a client receives the responses to multiple a synchronous calls issued to the service. This guarantee establishes that all requests from a client are executed in the same order as a client issued the requests. ZooKeeper also enforces the Linearizability of write operation, meaning that all state changes are serializable and respect precedence. In other words, all write operations get replicated to all ZooKeeper servers in the same order they have been received from the client.


ZooKeeper is built in a distributed manner and runs on multiple servers to provide high availability. ZooKeeper’s data is replicated across all the servers used by the service.


Figure 2. ZooKeeper Service (Source: ZooKeeper: Wait-free coordination for Internet-scale systems)

The figure above is a high level illustration of a ZooKeeper service. Read and Write requests scenarios are depicted in the figure. When a write request comes into the system, it is being processed, forwarded to a single “leader” server, state changes are coordinated among the servers and finally the request data is saved to the replicated database. Read requests need no coordination and since each ZooKeeper server has entire replica of all the data, the information is simply read from the local copy and sent back to the client.

Coordination among ZooKeeper servers is a very important as it ensures each copy has correct state and data read from a database copy on one server is the same as on other servers in the system. The coordination protocol also provides fault tolerance for the system, and in theory the system can continue correct operation as long as the majority of ZooKeeper servers are not faulty. The coordination step between ZooKeeper machines is depicted in the “Atomic broadcast” step in the Figure 2.

Replicated database is another important pieces of ZooKeeper service. Database is in used to store all znodes. It is in-memory and fully replicated, meaning that each properly operating server has a full replica of all ZooKeeper data. Snapshots of the database are periodically written to the disk to facilitate database recover if the server crashes for any reason and is restored later.

With an easy and intuitive API ZooKeepers allows to coordinate large scale distributed system, all while maintaining high availability and fault tolerance. In this little summary I tried to depict the basics of the service, without going into details on API or how coordination between ZooKeeper servers happen, as these are immense topics on their own.

Graph Processing at Facebook Scale

I will start with a little note on large scale graph processing, as described in the paper “One Trillion Edges: Graph Processing at Facebook Scale”.

Graph processing tasks are very common in analyzing various kinds of data, such as network topology of interconnection of people. Social media imposes challenges for such systems and algorithms due to the sheer amount of data. For instance, Facebook has more than 1.39 billion registered users and each users can represented by a vertex in a graph, making the graph consist of billions or even a trillion edges. This explains Facebook’s high interest in large scale graph processing systems.

Researchers at Facebook investigated a number of existing frameworks and systems for graph processing on the large scale and realized that none of the existing approaches were capable of handling the graphs of the Facebook scale. Thus, they decided to take action and develop a system adequate for the needs of the company. The result of their work was a significant contribution to the Apache Giraph project, greatly improving its performance and scalability.

Apache Giraph was chosen as the basis of Facebook’s graph processing system for a number of reasons:

  • Good initial performance compared to the competing products
  • Giraph is written in Java and facilitates the use of existing MapReduce infrastructure
  • Ease of debugging due to the underlying graph processing mode used

Facebook introduced many significant improvements to the Apache Giraph project both in terms of functionality and performance. Among the changes are the ability to use multiple sources for vertex and edge data, more fine-grained parallelism, memory utilization optimizations, addition of a more refined worker phases and introduction of centralized master computation.

Ability to use multiple sources for the input data is important for many large scale users, as the data required for a graph may originate from different sources or stored on various hardware and software stacks. Such feature, although not directly affecting the graph computation, can significantly reduce the time needed to load the graph structure by eliminating the need of data aggregation jobs that bring data from multiple sources into one storage infrastructure.

One of the performance improvements was in how the system handles parallelism. Since Apache Giraph is built on top of the MapReduce framework, the only way to provide more parallelism to the computation in early versions of the system was by increasing the number of worker nodes. Facebook changed Giraph  to allow more fine grained parallelism on the work node level by allowing each worker execute tasks with  multiple threads thus better utilizing multiple CPUs available on the worker machines. Such model introduces a second level of parallelism, so in addition to running multiple workers, each workers can handle its own parallel execution. Although such model allows for more fine-grained tune up of the system and improvements in hardware utilization, it reduces the overall maintainability of the system and makes development and debugging more complicated due to the existence of two distinct parallel execution mechanisms.

Apache Giraph is based on the Bulk Synchronous Processing (BSP) model of parallel computations. Although the model is widely used in many applications, Facebook introduced many extensions to it in the Apache Giraph Project. In particular, the addition of additional compute stages allows for a better code management and overall structure and allows for better optimization of the tasks executing in the system. Master computation enables users to perform certain centralized computation in predefined time intervals. Such master computation is useful when a certain piece of code needs to execute only once and does not need to run on the worker nodes, since master can communicate the results to the workers. In addition, master node computation allows for some synchronization between the nodes, because it relies on the clock of only the master machine to run. As such it allows to partially address, the clock skew problem in worker machines, although such synchronization is still very coarse.

Performance of the Apache Giraph was tested against Apache Hive tasks in the PageRank, label propagation and computing friends of friends score running on the same infrastructure. All of these algorithms showed significant improvements in CPU time and elapsed time as perceived by user when executed using on the Apache Giraph stack compared to Hive.  Unfortunately, no comparison with other specialized graph processing systems was provided, so it is not clear how it fares against the major competition. But the fact that a PageRank algorithm was able to run over one trillion edges in less than 3 minutes per iteration on 200 machines sounds impressive.