All posts by alekseyc

Do not Blame (only) Network for Your Paxos Scalability Issues.

In the past few months our lab has been doing a lot of work with different flavors of paxos consensus algorithm. Paxos and its numerous flavors are widely used in today’s cloud infrastructure. Distributed systems rely on it for many different tasks to ensure safe operation. For instance, coordination services use some consensus protocol flavor to provide services like leader election, cluster membership, service discovery and metadata management. Databases, such as Spanner or CockroachDB, use paxos to provide fault tolerant-replication of data across nodes or even datacenters.

If you work with Paxos, or had to deal with it at some point, you probably heard that it does not scale well to large number of nodes. ”Five nodes is ideal, do not try more” rhetoric has been repeated many times by many people and it gets engraved into one’s mind. ZooKeeper’s Administrator’s guide mentions 3 and 5 server deployments. Fairly recent epaxos provides evaluation on 3 and 5 node deployments in their paper. Paxos Made Live paper also mentions five nodes as typical Chubby deployment.

But why five servers is such a magical number in the Paxos world? The most common answer is along the lines of ”Increasing the number of servers increases the quorum size, making the paxos round leader wait for more messages to come to reach consensus”. This explanation is straightforward on the surface, after all, waiting for n nodes to reply should take longer than waiting for n − 1. The question then becomes why waiting for more replies is expensive? I thought the answer would be largely network related, but it appears to be more complex.

Modeling Paxos Performance: First Attempt

To answer this, I tried to model the non-faulty paxos execution time by looking at message exchange between nodes in the cluster. In the nutshell, running a phase of paxos requires one round-trip-time (RTT) between a node and its peers: the node broadcasts the message and waits for the quorum to reply. With network arguably being the slowest part, I can try  to express the paxos phase performance through the communication delays

First, let me define some variables to model a phase of paxos:

  • \(r_l\) – message RTT in local area network
  • \(\mu_l\) – average message RTT in local area network
  • \(\sigma_l\)  – standard deviation of message RTT in local area network
  • \(N\) – number of nodes participating in a paxos phase
  • \(q\) – quorum size. For a majority quorum \(q=\left \lfloor{\frac{N}{2}}\right \rfloor  +1\)
  • \(m_s\) – time to serialize a message
  • \(m_d\) – time to deserialize and process a message. This involves various message-related round tasks, such as ballot comparisons, log maintenance/updated, etc.
Figure 1: Amazon AWS EC2 local ping latency
Figure 1: Amazon AWS EC2 local ping latency

I assume that the network performance, at least in the local area, is normally distributed. Figure 1 shows a normalized histogram (shaded area is equal to 1) of latencies of approximately 2,000 ping requests within an AWS region.

RTT \(r_l\) for every message is drawn from a normal distribution \(\mathcal{N}(\mu_l+m_s+m_d, \sigma_l^2)\). This distribution simulates RTTs with additional  static, non-variable delay for serialization and deserialization. As such, the only variability so far is due to the network behavior.

In order to run a round of paxos, a node needs to send \(N − 1\) messages. That is, a node sends a message to every other node except for itself. I assume a ”leader” is also an acceptor with a short-circuit behavior, where a it does not need to send a vote to itself over a network.

Figure 2: Message RTT for a 7 node cluster
Figure 2: Message RTT for a 7 node cluster

Out of the \(N − 1\) messages sent, only \(q − 1\) messages actually matter. Upon receiving \(q − 1\) successful (once again, assuming non-faulty execution) replies a node has achieved a quorum and can finish the round (Figure 2). We can simulate this by drawing \(N −1\) random RTTs \(r_{l1}, r_{l2},…, r_{lN-1}\) from \(\mathcal{N}(\mu_l+m_s+m_d, \sigma_l^2)\) and sorting them. The \(q − 1\) fastest message \(r_{lq−1} \) is the one carrying the last vote to make up a quorum, thus after processing this message, the node no longer needs to wait for other messages to come.

Assuming that a node broadcasts all messages at the same time, message RTT \(r_{lq−1} \) can then be used to express the time \(T_r\) for the entire round: \(T_r = m_s + r_{lq−1} + m_d\). Figure 3 visually represents the paxos round expressed in the formula.

Figure 3: Visual representation of time needed to execute a round.
Figure 3: Visual representation of time needed to execute a round

Does the Model Make Sense?

With this simple model, I can simulate paxos execution. In multi-paxos optimization where phase-1 is used to primarily pick a stable leader and phase-2 repeats a number of times, the performance of paxos is approximated by just looking at phase-2. If \(m_s\) and \(m_d\) are constant, the variability in performance is only due to the network fluctuations.

I created a small python script to run such simulation, as if a single client was interacting with the paxos synchronously one command at a time. Figure 4 illustrates the results with \(m_s = m_d = 0.01 ms\) and network parameters taken from AWS ping latency figure.

Figure 4: Simulated TP and Latency from one client
Figure 4: Simulated Thtoughput and Latency from one client

At the first glance, we observe degradation in throughput and increase in latency. However, the performance decreases very slowly, and once I reach 9 nodes, the performance stays almost flat. This is very different from the data we have observed on our actual paxos implementation. Figure 5 shows the throughput and latency from a few concurrent clients interacting with the protocol.

Figure 5: Paxos TP and Latency from a small number of concurrent clients
Figure 5: Paxos Throughput and Latency from a small number of concurrent clients

The first thing that catches my attention is the performance degradation between the 3 and 5 node deployments. On the simulation, the difference in throughput was only 2.8%, while real paxos degraded by astonishing 30%.  As the cluster size increases, beyond 5 nodes, the real paxos also appear to degrade quicker.

Clearly, the network variability alone cannot explain the performance hit from increasing the number of nodes. So what is missing from the simple model that greatly impacts the performance as the cluster grows?

Towards an Improved Model

Starting from the obvious, the per-message serialization \(m_s\) and processing \(m_d\) overheads are not static constant parameters, instead they introduce some variance as well. However, \(m_s\) and \(m_d\) are small to start with, and making them introduce additional variance to the model will make it better, but it will not change the overall model simulation much.

Assuming that \(m_s\) is drawn from a normal distribution \(\mathcal{N}(\mu_{ms}, \sigma_{ms}^2)\) and \(m_d\) is similarly drawn from \(\mathcal{N}(\mu_{md}, \sigma_{md}^2)\), then the message RTT time \(r_{l}\) must be drawn from \(\mathcal{N}(\mu_l+\mu_{ms}+\mu_{md}, \sigma_l^2 + \sigma_{ms}^2 + \sigma_{md}^2)\). Making this changes to the model resulted in a difference between 3 and 5 node simulations to grow from 2.8% to 2.9% with \(\mu_{ms} = \mu_{md} = 0.01\) and \(\sigma_{ms} = \sigma_{md} = 0.002\) (some arbitrary values that make little impact, unless same magnitude as network mean and standard deviation).

Introducing the variability to message overheads \(m_s\) and \(m_d\) still does not account for these overheads being dependent on the number of node in the cluster. The dependence on \(N\) is not direct, and instead the per-message costs increase as the number of messages that a server needs to handle grows. This puts a ”leader” node repeating the phase-2 of paxos in a more vulnerable position as it will experience the increased traffic.

To understand how message serialization and processing depends on the number of messages exchanged, we need to consider the implementation of application’s network layers. Often times, as an application receives the message, it will use one or more processing queues or pipelines to deserialize the message and dispatch it to the appropriate handler for processing.

Let’s consider an example with just one such pipeline. If the queue is empty upon message arrival, it will get to deserialize with no further delay. However, when there are other messages in the queue, the new message has to wait for it turn to be processed.

A single Paxos round has a bursty network utilization, especially at the leader. This is because the leader node first broadcasts a messages and then receives multiple replies at roughly the same time. If the message deserialization has not finished before the next message arrives, then the pipeline gets clogged. Further messages potentially make the issue worse by growing the queue, as shown in Figure \ref{fig:pipeline_clogged}.

Figure 6: Example of deserialization pipeline stalling paxos round
Figure 6: Example of deserialization pipeline stalling paxos round

Obviously, having multiple parallel pipelines can help, given that they are balanced and there are enough compute resources to run them. However, the problem can also get worse when we start running rounds concurrently (i.e. multiple clients interact with paxos). In such scenario, messages from different rounds will compete for the fixed number of processing queues.

Where does this leave me with trying to model paxos round performance? I need to account for a number of additional factors, such as the number of message-processing queues/pipelines/threads, the rate of message deserialization at each queue, and the number of concurrent paxos rounds. Oh, and since the messages arriving after the quorum has been reached still need to get deserialized, they will contribute to how clogged the pipelines are.

I will pick up from this point onwards in some future post. I will also try to look at flexible quorums and our WPaxos protocol.

Trace Synchronization with HLC

Event logging or tracing is one of the most common techniques for collecting data about the software execution. For simple application running on the same machine, a trace of events timestamped with the machine’s hardware clock is typically sufficient. When the system grows and becomes distributed over multiple nodes, each node is going to produce its own independent logs or traces. Unfortunately, nodes running on different physical machines do not have access to a single global master clock to timestamp and order the events, making these individual logs unaligned in time.

For some purposes, such as debugging, engineers often need to look at these independent logs as one whole. For instance, if Alice the engineer needs need to see how some event on one node influenced the rest of the system, she will need to examine all logs and pay attention to the causal relationships between events. The time skew of different nodes and the trace misalignment caused by such skew prevents Alice from safely relying on such independently produced traces, as shown in the figure below. Instead, she need to collate the logs together and reduce the misalignment as much as possible to produce a more coherent picture of a distributed system execution.

Misaligned logs with PT
Physical time makes logs misaligned and miss the causality between events.

Of course if only the causality was important, Alice could have used Lamport’s logical clocks (LC) to help her identify some causal relationship between the events on different nodes. Alternatively, logging system can also use vector clocks (VC) to capture all of the causal relationships in the system. However, both LC and VC are disjoint from the physical time of the events in the trace, making it hard for Alice to navigate such logging system.

Using synchronized time protocols, such as NTP and PTP  will help make the traces better aligned. These protocols are not perfect and still leave out some synchronization error or uncertainty, introducing the possibility of breaking the causality when collating logs purely with them.

HLC sync

Instead of using NTP or logical time for synchronizing the event logs, I thought whether it is possible to use both at the same time with the help of Hybrid Logical Time (HLC). HLC combines a physical time, such as NTP with logical clock to keep track of causality during the period of synchronization uncertainty. Since HLC acts as a single always-increasing time in the entire distributed system, it can be used to timestamp the events in the log traces of every node. You can learn more about HLC here.

Similar to logical time, HLC does not capture full causality between the events. However, HLC conforms to the LC implication: if event A happened before event B, then HLC timestamp of A is smaller than the HLC timestamp of B. This can be written as A hb B ⇒ hlc.A < hlc.B. Obviously, we cannot use HLC timestamps to order any two events. Despite this limitation, we can still use the LC implication to give some partial information about the order of events. If an event A has an HLC timestamp greater than the event B, we can at least say that event B did not happen before A, thus either A happened before B or A is concurrent with B: hlc.A < hlc.B ⇒ A hb B ∨ A co B.

We can use this property to adjust the synchronization between the traces produced at different nodes. Let’s assume we have two nodes with some clock skew. These nodes produce logs that are not fully synchronized in time (we also assume the knowledge of a global, “ideal” time for now). The events in the log happen instantaneously, however we can rely on the machine’s clock to measure the time between events on the same node to give the “rigidity” to the logs. Each node timestamps the log events with its machine’s physical time (PT).

Unaligned logs
Logs aligned based on PT time. Ideal global time is shown in red.

In the figure above, the two logs are not synchronized in the “ideal” time, even though they appear to be in sync based on the PT of each node. Without any additional information we cannot improve on the synchrony of these logs. However, if we replace PT time with HLC, we can achieve better trace synchronization.

unaligned logs with HLC
HLC can be used instead of PT to synchronize the traces

With the addition of HLC time, we may see that when the logs are aligned by the PT only, some HLC timestamps (highlighted in yellow) appear to be out of place. In particular, this alignment does not satisfy the LC condition (A hb B ⇒ hlc.A < hlc.B), since the alignment makes event a1 to appear to happen before c2, however, hlc.c2 > hlc.a1. In order to satisfy the condition, we need to re-sync the logs such that c2 appear concurrent with a1.

HLC is used to improve log synchronization.
HLC is used to improve log synchronization.

After the adjustment, our synchronization error between two nodes is reduced. Note that we cannot synchronize the logs better and put a1 to happen before c2, since the LC implication simple does not allow us to do so.

The two node synchronization works nice and easy because the LC/HLC implication provides some guarantees about the two events, and we pick these two events from two separate nodes.  Aligning more than two logs is more challenging, as we need to perform many comparisons each involving just two events from some two nodes. The number of possible comparison we need to make grows drastically as we increase the number of traces to sync.

However, with HLC we can reduce the problem to just that of performing n-1 2-node log alignments when we need to sync logs from n nodes. HLC operates by learning of higher timestamps from the communication, thus HLC time at all nodes of the cluster tends to follow the PT time of one node with the highest PT clock.  Once, again to see this you need to understand how HLC ticks, which is explained here. Having one node that drives the entire HLC in the cluster allows us to synchronize every other log from each node independently with the log of that HLC “driver node”.

Some Testing and Limitation

I set up some synthetic tests to see if HLC can help us achieve any improvement in log synchronization on already loosely synchronized system. The benchmark generates the logs with a number of parameters I can controlled: time skew, communication latency and event probabilities. Maximum time-skew controls the time desynchronization in the simulated cluster measured in time ticks. There will be two nodes in the simulation with maximum time-skew difference between their clocks. Communication latency parameters control the latency of simulated communications in time ticks. Probability of an event parameter controls the chance of an event happening at every time tick; similarly, the probability of communication determines the chance of an outgoing message happening at the time tick.

Since the logs are generated synthetically, I have access to the ideal time synchronization between these logs, allowing me to quantify the alignment error. I calculated the alignment error as 1 – adjustment / skew for every pair of logs.

The figure below shows the error in synchronizing 5 logs as a function of the log length. The Idea is that a longer trace can allow for more opportunities to find the violations in LC/HLC condition and adjust the logs accordingly. All other parameters were kept constant with skew of 50 ticks, communication delay of 3 to 10 ticks, a chance of event as 40% per tick. I made different plots for different probability of inter-node communication. I repeated the simulation 100,000 times and computed the average error.

results 1
Log alignment error as a function of log length

We can see that in this setup the synchronization between logs was significantly improved, and the improvement happens faster when communication if more frequent. This is because communication introduces the causality between nodes, allowing me to exploit it for synchronization. As the logs grow larger the improvements diminish. This is likely due to the reducing closer to the communication delay, at which point HLC synchronization can no longer make improvements.

Inability to achieve any synchronization improvement when the communication latency is equal or greater than the maximum time skew in the system is the most significant limitation of HLC synchronization. The following graph illustrates that:

results
HLC synchronization limitations

Here I ran the simulation with 3 different skew levels: 10, 20 and 50 ticks. As I increase the communication latency, the error is growing as well, getting to the level of no improvement as the latency reaches the time-skew.

Some Concluding Words

I think this is a rather naïve and simple way to achieve better synchronization for distributed logging software. One can argue that NTP and PTP time synchronization methods are rather good nowadays and that it should be enough for most cases. However, computers are fast now and even in 1 ms of desynchronization many computations can be made. Even a few full round trips of network message exchange can happen in that 1 ms in good Local Area Networks.

HLC synchronization’s simplicity allows it to be implemented purely in user-level application code. There is no need to constantly run NTP protocol to keep tight time synchrony, there is no need to access or modify any of the underlying system-level code beyond just reading the clock, there is no need to access high precision clock that may slow the application down.

HLC can also be used within a single machine to synchronize on traces from different threads. Despite having access to the same clock at all threads and processes, the clock granularity may still be coarse enough to do many computations within a single tick.

HLC sync is not without the limits. Its usefulness degrades as the synchronization gets closer to the communication latency, but it can still be used as a fail-safe mechanism in cases NTP fails.

Alternatively to synchronizing logs, HLC can be used to find consistent global states and search through the distributed application’s past execution.

One Page Summary: Flease – Lease Coordination without a Lock Server

This paper talks about a decentralized lease management solution. In the past, many lock/lease services have been centralized, placing a single authority to manage all locks in the system. Google’s Chubby, Apache ZooKeeper, etcd, and others rely on a centralized approach and backed by some flavor of a consensus algorithm for fault-tolerance. According to Flease authors, such centralized approach Flease Groupsmay not be ideal at all times and can create a bottlenecks when coordination is required only within small groups of nodes in the system. Distributed filesystems seem to be candidates for this sort of lock management, as a group of nodes tend to be responsible only for resources sharded to that group. Each such group acts independently and maintains locks for resources assigned to it, making a global lock not necessary.

The implementation of decentralized, sharded, lock management system is rather trivial. Since getting a lock/lease requires nodes to reach the consensus about the lease ownership and duration, a Paxos algorithm will suffice. In fact Flease uses a flavor of Paxos built with a distributed register. Right away we can see why Flease targets systems that are sharded into small non-overlapping groups. Running Paxos over too many nodes will degrade the performance.

Just like other lease system, Flease needs to have synchronized clocks with known max time skew uncertainty ε to control lease expiration. It also places some restrictions on minimum lease duration due to the network characteristics, i.e. lease duration has to be greater than two RTTs and greater than ε. Choosing the max lease duration is important for performance reasons.

Flease ThroughputFlease was evaluated against ZooKeeper, as both are implemented in Java and use the same network IO libraries.The figure shows throughput per (client) machine with Flease (straight line) and ZooKeeper. Flease uses small groups of 3 nodes, each running Paxos, and ZooKeeper runs on 3 nodes as well, with all clients connecting to it for lock management. As expected form this experiment, Flease has greater parallelism. When running 30 clients, Flease runs 10 Paxos machines, while ZooKeeper still operates a single one (3 nodes) with 30 clients connected. It is unclear how quickly the performance of Flease will degrade as the group size increases. In essence, similar if not better results could have been achieved by deploying separate ZooKeepers for each group to allow for the same level of parallelism.

The problem of lock management is important and Flease give a good example of application that can benefit from a less-centralized solution to locks. However, the approach in this paper is as trivial as deploying multiple lock management systems for each of the independent non-overlapping groups of nodes in the system. Flease performance will degrade if group size grows above 3 to 5 nodes. In addition, the algorithm is not suitable when group boundaries must be breached on occasion.

One Page Summary: “milliScope: a Fine-Grained Monitoring Framework for Performance Debugging of n-Tier Web Services”

Authors of the ICDCS2017 milliScope paper attack an interesting monitoring problem for distributed systems: detecting and determining a cause of short-lived events in the system. In particular, they address the issue of identifying very short bottlenecks (VSBs) in distributed web services. VSBs manifest themselves as performance degradation of a small number of requests, however they are intermittent, short-lived and hard to detect with tools that aggregate or sample performance data.

milliscope1MilliScope is a monitoring solution aimed exactly at detecting such short bottlenecks and finding their root-cause. For the detection part, milliScope relies on the Event mScopeMonitors. These monitors are present at every component of the web service and record the timestamps associated with each request entering and leaving a particular component. In total, 4 local timestamps are recorded per component, 2 on the forward pass of the request and 2 on the return. This classical request tracing approach captures causal information about request propagation through the components, and it is sufficient to pinpoint the exact request and the exact component experiencing a slow-down.

milliscope2In order to provide more information about the cause of the bottleneck, milliScope uses Resource mScopeMonitors. Resource monitors use existing performance monitoring tools to log the metrics, such as CPU utilization, memory usage, I/O usage, etc. MilliScope then transforms the performance logs to a common, structured format, adding some extra support information. The data from both the event and resource mScopeMonitors eventually ends up in a warehouse and can be queried by the users. Users can overlay the data from different monitors onto each other, allowing to better identify what component or components were causing the bottleneck and what was the root cause behind the slowdown. An example of such overlaying is shown in the figure on the left, where the queue lengths of different components are shown on the same graph.

Some Critical Questions

The paper claims that existing monitors have too much overhead and thus cannot capture all requests and find the bottlenecks, however event mScopeMonitor is a simple request tracer that is no different from many others, so it is not clear why it should perform better.

MilliScope collects lots of performance monitoring data in different logs and then brings the data to a centralized location for storage and processing. Authors never mentioned in the paper how they address clock uncertainty between all these different logs. Do they require clock sync, such as NTP? Even though the event monitors capture causal relationship within the same request, resource monitors seem to rely on physical time. The authors claim that milliScope can detect and help explain bottlenecks as short as 10 ms, but what will happen if the resource monitors are skewed more than 10 ms? How accurate is milliScope going to be? In fact, both case studies in the paper worked with the bottlenecks of hundreds of milliseconds or more.

And finally, what can we do once we detected a very short bottleneck? Since VSBs are so short-lived, there may not be any time to react to their presence. Maybe a next step would be to look for precursors to the bottlenecks, so we can rebalance the system or prepare it in some other way for an incoming performance hiccup?

Sonification of Distributed Systems with RQL

In the past, I have discussed sonification as a mean of representing monitoring data. Aside from some silly and toy examples, sonifications can be used for serious applications. In many monitoring cases, the presence of some phenomena is more important than the details about it. In such situations, simple sonification is a perfect way to alert users about the occurrence of such phenomena.  For example, Geiger counter alerts users of the radioactive decay, without providing any further details.

Our recent work on Retroscope and RQL allows us to bring sonification to the same “phenomena-awareness” plane for distributed systems. With RQL we can search for interesting conditions happening globally in the system, and we can use sounds to alert the engineers when certain predicates occur in the global state. Of course, for such system to be useful, it needs to be real-time, as the recency of events is the most important attribute of this type of sonification. For instance, users of a Geiger counter are not interested in hearing ticks for decay events happening a minute ago, as it rather defeats the purpose of the counter. Similar requirements apply to the “phenomena-awareness” tools in the distributed systems domain, as engineers need to know what happens in the system instantaneously.

RQL can easily allow inspecting past states, however it lacks when it comes to the current state inspection.  What we need is a streaming query that can continuously receive individual logs from Retroscope log-servers, and perform the search as soon as sufficient data become available to form a consistent cut. Of course, streaming queries will still be lagging behind the current time, however we can make this lag small enough to be virtually unobservable by a person.

With streaming queries we can not only receive the cuts meeting some search criteria in near real time, but we can also sonify the mere fact such cuts have been found. If we look at the previous example of ZooKeeper staleness, we can run sonified streaming queries to have the system alert us when the data staleness reaches some threshold, such as two or more versions stale.  In the audio clip below, we have used MIDI to sonify the stream of staleness events occurring in a ZooKeeper cluster. Multiple queries were sonified to produce different sounds for different staleness of ZooKeeper data.

We can hear periods of silence when the staleness is below the threshold of 2 values, however we can also observe some variations in cluster performance. For instance, it is very easy to identify when the cluster was experiencing more problems than normal. It is also easy to hear it recovering from the spike in staleness soon after.

Retroscoping Zookeeper Staleness

ZooKeeper is a popular coordination service used as part of many large scale distributed systems. ZooKeeper provides a file-system inspired abstraction to the users on top of its replicated key-value store. Like other Paxos-inspired protocols, ZooKeeper is typically deployed on at least 3 nodes, and can tolerate F node failure for a cluster of size 2F+1. One characteristic of ZooKeeper is that it runs the consensus algorithm for update operations, however to speed things up reads may be served locally by the replica a client connects to. This means that a value read from a replica may be stale or outdated compared to the leader nodes or even other followers. This makes it a user’s problem to tolerate the stale data read from ZooKeeper, or perform sync operation to get up-to-date with the leader before reading.

But how stale can data become in a ZooKeeper cluster? This is a rather tricky question to answer, since each replica runs on a separate machine with different clock. We cannot just observe the time a value become available at each node, because these timestamp are not comparable due to clock skew.  Instead of trying to figure out the staleness time, I decided to look at how many versions behind can a replica be. I used my Retroscope tool to keep track of when the data becomes available to the client at each replica. I used Retroscope Query Language (RQL) to collect the data from nodes and look at the consistent cuts progressing through the states of ZooKeeper. Retroscoping ZooKeeper took only about 30 lines of code to be added to the project.

f1
Initial run. 1 version staleness is expected.

I deployed a small ZooKeeper cluster on 3 AWS t2.micro instances (I know, it is far from production setup, but it works well for a quick test). On a separate instance I deployed RQL server. To start, I simply created one znode and updated its value. I then proceeded with running the simplest possible RQL query: SELECT retro FROM setd; in this query, retro corresponds to /retro znode I’ve created, and setd is the name of the Retroscope log I put my monitoring data into. The result of the query was exactly what I have expected: at one of the consistent cuts one of the nodes had a value one version behind. This is entirely normal behavior, as the value needs to propagate from the leader to the followers once decided.

My next move was to give a bit more work to the ZooKeeper in a short burst, so I quickly wrote a small program that puts some incremental values to n znodes for a total of r writes. It starts with a value tst1, then goes to tst2 and so on for every znode.  At first I restricted n=1, as I felt that writing to just one znode to create a “hot-spot” was going to give me the best chance of getting stale values. But ZooKeeper handled the burst of 100 writes easily, with the results being identical to a single write: stale value was at most 1 version behind the current data.

f2
Two versions behind.

Seeing things work well was not interesting for me, so I decided to no longer play fair. I artificially made one replica to work slower and be a struggler node. ZooKeeper protocol tolerates this just fine, as it needs 2 out of 3 nodes in my cluster to form the majority quorum and make progress. For this crippled ZooKeeper setup I re-ran the workload and my simple query. Needless to say, I was able to spot one time a system had a node with a znode 2 versions behind, and it was my struggler machine. In the next step I increased the load on the cluster and made the workload write 1000 values to the same znode. I also changed the query so that I do not have to manually look through thousands of consistent cuts trying to spot the stale data. My new query emits consistent cuts with staleness of 2 or more versions:

SELECT r1 FROM setd WHEN Int(StrReplace(r1, “tst”, “”)) – Int(StrReplace(r1, “tst”, “”)) > 1;

The interesting thing about the query above is that it uses the same variable name twice in the expression, essentially telling RQL to output a cut when r1 – r1 > 1. However, there are are many r1‘s in the system (3 in fact, one at each node), so when a pair of r1‘s that satisfy WHEN condition is found (at different nodes), RQL will output the consistent cut.

f3
Very stale.

I was a bit surprised by the results. At first the struggler managed to keep-up with the rest of the cluster quite well, slipping 2 version behind on occasion, but after about 200 requests things went out of control for the crippled node, with the staleness growing to be 158 version behind towards the end of the run. Of course a struggler node will make things look worse, but it is not an unrealistic scenario to have underperforming machines. My test however is not fair either, as I was using a 100% write workload targeting just one znode. So In the next try I changed the workload to target 100 different znodes, while still measuring the staleness on just one znode. In that experiment, the  staleness was not that high, but the number of updates to a single znode was only a small fraction of the previous test. Nevertheless, struggler replica was as much as 6 version stale, making it roughly 1/3rd of updates behind the rest of the cluster.

For the last quick test, I tried doing a large burst of 2000 writes to the same znode on a healthy cluster with no struggler nodes. Despite all replicas working at their proper speed, I was able to observe staleness of 3 versions on some occasions.

I am not sure about the lessons learned from this quick experiment. I was amazed by how easy it is to get ZooKeeper to have data 2 or more versions behind, although the system seems fast to catch up. Struggler scenario, however, illustrated how quickly things can get out of control with just some performance degradation at one of the replicas.  Engineers using ZooKeeper must build their applications in such a way to tolerate the stale ZooKeeper values gracefully.

Why Government IT is Expensive and Archaic

Disclaimer: I do not work for the government, and my rant below is based on my very limited exposure to how IT works at the US government setting.

Why Government IT is Expensive and Archaic? I think, this can be a very long discussion, but I do have a quick answer:  standards imposed by government and used by the government regulated industries. I have a very little experience with these types of standards, but they make me cringe every time I have to deal with them. Bellow I briefly describe my encounter with them.

When I was just a college student, I joined a (very) small IT company sitting next to the University campus. I started just as an intern during my 3rd year of college, and I was working fulltime a year later. At the same time, our team was tasked with making a piece of software for a private company to be used for tracking medical services provided to students at public schools throughout the state. This piece of software was to replace an older one, and we had a very strict set of requirements: “Make it work and look the same but better”.  Such requirements, along with many decisions a college-student-turned-software-engineer had to make, shaped how the system is working right now. Of course, it was not just me making the product, but nevertheless, my “brilliant” ideas slipped in and became what the system is today.

One part if the system is responsible for billing the medical services tracked in the system to Medicaid. And this is where the interactions with government IT has started. Government agency cannot just provide a secure API for the software developers like myself to use. No, it has to hire a major business to implement a standard commonly used for Medical transaction. And here we are, in the 21st having to adhere to the Electronic Data Interchange (EDI) X12, a standard designed in the 1970s to transfer medical (among other EDI uses) data between computer systems. Don’t get me wrong, standards are good, they make different systems work together flawlessly… But that is until you start looking at the standard. For the system, we had to implement only one transaction type at that moment, so what can be difficult about it? The difficulty started with a 700 page manual just for constructing the transaction request. The manual is accompanied by an errata and an errata of errata. In addition, there is a 100 page companion guide that specifies procedures specific for the state Medicaid.

So how easy is the standard itself? Maybe it is not that complicated to work with and the manuals are just full of fluff? Well, EDI is a textual format, so in theory a person can read the file to see all the data. Manuals even provide the example of how pieces of it should look like: SV1HC:99211:2512.25UN111✽✽1:2:3✽✽Y~

Easy enough to read? Nope, so you are back at the manual studying what everything means. For example, SV1 is the header for section describing the professional (medical) service provided, HC code describes what type of codes to follow next, 99211 is the service code followed by code modifiers and modifiers of modifiers and so on. Somewhere in there is how much to charge Medicaid and how many units of service have been provided. But on the bright side, it has cool delimiters: stars, colons and tildes. To top it off, each field can be flexible in size or can be restricted to some number of characters or to a certain set of values, and to find this you consult the manual once again.

What if you make an error? Not a problem, the response to the request comes back as an EDI file, equally cryptic, that describes what went wrong. And we are back to the manuals, counting stars and tildes to check if you send all the data and whether it was in the right format and right order.

It is also worth mentioning that EDI format is used for HIPAA protected medical data, but it has no security built in, everything is plaintext and with the help of manual anyone can read it.  The transmission of the requests, however, is carried out over a secure channel. In my case, I am sending batched requests for processing and the only way to transmit those is by using an SFTP. Needless to say it also becomes my responsibility to pull the response files from the SFTP server once requests are processed, and since there is no strict guarantees on when the processing is completed for each batch, I just do it periodically and eventually collect the responses. How the EDI files are stored on the system side is all up to the engineers designing such system, and as users we can only hope it has passed HIPAA compliance checks and secures the data at rest.

Obviously the archaic standards, like EDI, work well in practice. After all, despite my “brilliant” ideas implemented in other parts of the system, the EDI layer was relatively problem-free. But the standards definitely can be better and new standards, if developed, can provide improved security,  they can be easier to work with, and they will reduce the costs of developing new software. However, I suspect the costs of switching to new standards are just too high for the existing infrastructure, forcing the industry to create a bigger and bigger gap between what government agencies require and what is modern, efficient and secure.

I no longer work for the company, but I still maintain the software. And I was dreading the time a client asks for changes to how the system interacts with Medicaid. It seems like this time is upon me now, and in the summer I will be looking at more manuals for EDI X12 requests needed to implement new features.

The First Datastore-driven Vehicle

vold-carIt is not a secret that procrastination is the favorite activity of most PhD students. I have been procrastinating today, even though my advisor probably wants me to keep writing.  In the midst of my procrastination, I thought: “Why are there self-driving vehicles, but no database-driven vehicles?” As absurd as it sounds, I gave it a try. And now I not so proudly present the prototype of the world’s first(?) datastore-driven vehicle. The implementation was quick and simple. I took the Voldemort code I have been tinkering with in my Voldemort traffic light post, and changed it to send the signals over Bluetooth connection to a Frankenstein-of-a-car I’ve built very quickly out a few motors, a gearbox, an Arduino, a bunch of wires, some tracks, and lots of thermal glue.

The car changes the direction of travel when the workload characteristics change: it will go straight when number of read operations is the same as number of write operation.  By changing the read-to-write ratio of the workload, the car will turn either left or right. The prototype vehicle is not very responsive (weak motors/dead batteries) and sometimes runs into things despite my best attempts to control it.

And now, once the world has a database-driven car, the procrastination can continue… Or should I start writing?

One Page Summary: “Musketeer: all for one, one for all in data processing systems”.

Many distributed computation platforms and programming frameworks exist today, and new ones constantly popping out from the industry and academia.  Some platforms are domain specific, such as TensorFlow for machine learning. Others, like Hadoop and Naiad are more general, and this generality allows for sophisticated and specialized programming abstractions to be built on top.

Coupled vs DecoupledSo we naturally ask a question, which distributed computation platform is faster and more scalable? And what programming model or framework is better? Authors of the Musketeer tried to find the answer and concluded that there is no such thing as the perfect computation platform or perfect programming front-end, as they all perform better under different circumstances and workloads. This discovery led to the Musketeer prototype, which decouples the programming front-ends from their target platforms and connects all the front-ends to all the computation back-ends.

Musketeer schematics.Musketeer accomplish this task through translating the code created in every supported programming front-end to an intermediate representation (IR). This intermediate representation can later be translated into the commands of the computation platforms. Think of Java or Scala code translated to Java bytecode before being JIT compiled to native code as the program runs. Each IR is a data-flow DAG representing the progression of operations in the computation.

back-end detection
♣ indicates Musketeer’s pick

Musketeer runs the IR on the platform it chooses the most suitable for the job (Figure on the right). For example, some platforms may be better for some operations and not every platform supports all types of computations. Musketeer detects the patterns of operations in the IR through idiom recognition and will not allow IR containing certain patterns to run on the platform that does not support these patterns or performs badly.

Combining back-endsMusketeer can also split the entire computation into smaller jobs, each running on different platform. The partitioning is done by picking a lower cost back-end to run the partition.  Of course finding the partitions in the DAG is a complicated problem (NP-hard).Musketeer uses exhaustive search to find best partitions for smaller IRs, and heuristic based dynamic programming approach for larger DAGs when the cost of brute-force approach becomes prohibitive. Heuristic approach does not examine all possible partitions. Such partitioning allows achieving better performance than a homogeneous platform.

The IR is not optimized when it is generated at first, however Musketeer performs the optimizations before starting the computation. In particular, certain operators in the IR DAG will be merged together to reduce the number of jobs executed. The merging affects the platform choice, as some platforms support more complex jobs, while others need more steps to achieve the same result. Musketeer also optimizes for the IO by trying eliminate the duplicate scanning of a dataset. All the optimizations allow the code generated from the IR to have similar performance as the manually optimized code.

One Page Summary: “Slicer: Auto-Sharding for Datacenter Applications”

One of the questions engineers of large distributed system must answer is “where to compute”. This is a big and important question, as we do not want to send a request originating in the US to some server in Australia. It simply makes no sense to incur the communication overhead if there are resources available closer. But physical affinity is not the only requirement to answer the question. Just as we would not make an Olympic swimming competition in a desert simply because a desert is close, we would not send a Russian language speech recognition request to the servers with English language model. In other words, when deciding on the placement of a computation, we need to find the closest place with enough resources of the right type.

Slicer is a Google’s answer to this “where” problem. It continuously tries to find the best possible place to perform the computations in the presence of dynamically changing workload. Many applications within Google, such as speech recognition, various caches and DNS service use Slicer to partition and balance their workload among a set of tasks, or application servers (containers). Computation placement is not on the application critical path, and it is somewhat static: once it has been made, we can use it for some time until the conditions change. Think of finding a swimming pool for our Olympics competition ahead of the actual races and sticking with the same pool until the games are over.

Slicer sticks with the same pool of resources as well, as it directs similar requests to the same set of tasks. Each application using Slicer needs to produce a key associated with the request and Slicer hashes it into a 63-bit internal key, which is then used to decide on the placement of the computation. This makes it possible for requests with the same key to be processed by the same set of tasks, catering to locality of reference.

Clerk library provides a client interface to pull and learn which keys are assigned to which tasks. With Clerk, clients can learn the assignment and communicate directly with tasks. Google’s RPC service Stuby is also integrated with Slicer and can provide Clerk’s functionality, so many applications do not even need to use Clerk. Application servers use a similar library, called Slicelet, to learn which keys that particular task is expected to process, allowing tasks to adapt for the change in the requests they serve.

Slicer service is divided in two major parts: Assigners and Distributors. Assigners map keys to tasks, while Distributors retrieve these maps from Assigners and deliver them to the Clerks and Slicelets upon request. More than one Assigner exists globally, and any assigner can generate the key assignment for any application, however, in most cases only one assigner is considered to be preferred. Sometimes more than one Assigner can act as preferred for short duration, but Assigners eventually converge to the same assignment through the optimistically-consistent storage. Slicer also has a mode with strongly consistent assignment, however it has not been tested on production at the time of paper publication. Slicer also packs a variety of backup mechanisms to make sure clients can still make the requests despite Assigner or Distributor failures. In case of miss-assignment (~0.02% of requests), for example due to failed tasks, client will need to try the request again with the new assignment obtained from Slicer.

Assigners perform load balancing for the requests by changing how many tasks are assigned to each key. Balancing is done taking into account a usage rate of different keys. As the load changes, Slicer updates it’s mapping by taking some servers off the lesser used keys and putting them for the use by “heavy” keys. Load balancing results in lesser load on the “hottest” tasks of an application (median job’s hottest task is 63% less loaded), compared to static request distribution. At the same time, Slicer reacts to shifting workload patterns in the matter of minutes.

Slicer hottest task load.
Hottest task load under Static sharding and Slicer. Slicer gets 63% lower hottest task load for a median application.

Slicer seems to be playing a big role in deciding where to process requests at Google, at least for certain applications. It provides load balancing and ensures the requests get to servers or tasks that are both prepared to accept them and have capacity to do so.