# Ocean Vista

On Wednesday we had a presentation and discussion of the Ocean Vista (OV) replication and distributed transaction protocol. OV works in the WANs, where each region has all data-partitions, and transactions can originate in any region. OV separates replication from transaction execution, by making replication conflict-free with a FastPaxos-inspired protocol. For the transaction execution, OV maintains the visibility watermarks, such that any transaction ongoing the replication is not yet visible, and all transactions below the watermark are visible. The protocol computes the watermark via a hierarchical gossiping protocol by taking the minimum watermark from each region. The regional watermark, in its turn, is a minimum of the server watermarks in the region, and a server watermark is an ongoing transaction with a minimal timestamp on that server. A few additional optimizations exist to allow reading from a single server, but these require an additional watermark to designate the full replication of a transaction.

Below you can find a presentation from our reading group by Balaji Arun

# Discussion

Our discussion focused on a few points/questions:

(1) Why does OV protocol use the FastPaxos-like algorithm for replication? FastPaxos requires the use of larger “super quorums”, however, OV replication is not client-driven, and a server specifically picks a unique timestamp for the replication. In FastPaxos, multiple commands may be tried on the same instance (i.e. timestamp), requiring a larger quorum for recovery in phase-1 of Paxos with a smaller majority quorum. However, in OV we do not see such conflicts: the timestamps for transactions are unique and are assigned by one node that coordinates replication. The only possible conflict that we saw happening is when a transaction first tries to replicate the command and then issues an abort on the same timestamp. This arguably creates a write-write conflict on the same instance (timestamp), but we think it can be resolved by establishing fixed precedence to make aborts always win over the regular writes. With precedence order established, writing abort to a majority of nodes should be sufficient to make abort persistent and recoverable. We have not reached a definite conclusion on why OV uses larger fast quorums, so we may still be missing something in our understanding of the protocol.

(2) Another discussion point was concerning the evaluation and comparison. A recent SLOG paper solves a similar problem, so we were wondering how it may behave compared to the OV. The group’s conclusion was a definite “it depends.” From one perspective, OV is more decentralized, so it may be able to achieve higher throughput when there are many multi-partition transactions. SLOG is more centralized, having both a dedicated master per partition and a dedicated ordering layer for the transactions involving many partitions. Both systems, however, do not abort transactions in most of the cases (unless there are significant failures), so their performance may be close to each other.

(3) Related to (2). Is comparison with TAPIR a fair one? Two protocols operate quite differently. We thought that it would have been nice to see a comparison with Calvin and SLOG.

(4) Performance for geo-sharded setup could suffer greatly. If we do not have all partitions/shards in every region, we will need to do a cross-region replication. In the protocol, the visibility watermark gets updated after the replication is complete, and having WAN replication will delay that process. Moreover, this may delay the visibility of other transactions that do not have geo-replication but have a timestamp after the geo-replicated transaction.

# One Page Summary. Gryff: Unifying Consensus and Shared Registers

This paper by Matthew Burke, Audrey Cheng, and Wyatt Lloyd appeared in NSDI 2020 and explores an interesting idea of a hybrid replication protocol. The premise is very simple – we can take one protocol that solves a part of the problem well, and marry it with another protocol that excels at the second half of the problem. This paper tackles replication in geo-distributed strongly consistent storage systems. The authors argue that consensus, when used in storage systems with predominantly read and write operations, is inefficient and causes high tail latency.

A system presented in the paper, called Gryff, takes advantage of predominantly read/write workloads in storage systems and exposes these two APIs via a multi-writer atomic storage ABD protocol.  ABD operates in two phases both for reads and writes. On writes, ABD’s coordinator retrieves the latest version of the register from all nodes and writes back with a version higher than it has seen. On reads, ABD’s coordinator again retrieves the register, writes the highest version back to the cluster to ensure future reads do not see any previous versions, and only then returns back to the client. The write-back stage, however, can be skipped if a quorum of nodes agrees on the same version/value of the register, allowing for single RTT reads in a happy case.

Unfortunately, ABD, while providing linearizability, is not capable of supporting more sophisticated APIs. Read-modify-write (RMW) is a common pattern in many storage systems to implement transaction-like conditional updates of data. To support RMW, Gryff resorts back to consensus and in particular to Egalitarian Paxos (EPaxos) protocol. Choice of EPaxos allows any node in the cluster to act as the coordinator, so it does not restrict writes to a single node like with many other protocols. The problem of this hybrid approach is then the ordering of operations completed with ABD protocol and RMW operations running under EPaxos. Since EPaxos side of Gryff works only with RMWs, it can only order these operations with respect to other RMW operations, but what we need is a linearizable ordering of RMWs with normal writes and/or reads. To keep the ordering, Gryff uses tuples of ABD’s logical timestamp, process ID and the RMW logical counter, called carstamps. Carstamps connect the ABD part of the system with EPaxos – only ABD can update ABD’s logical clock, and only EPaxos updates RMWs counter.

When we consider the interleaving of writes and RMWs, the write with higher ABD’s logical time supersedes any other write or RMW. This means that we actually do not need to order all RMWs with respect to each other, but only order RMWs that have the same base or ABD’s logical time. EPaxos was modified to allow such partial ordering of commands belonging to different bases, essentially making the protocols to have different dependency graphs for RMWs applied to different ABD states. Another change to EPaxos is the cluster-execute requirement, as the quorum of nodes need to apply the change before it can be returned to the client to make the change visible for subsequent ABD read operations.

So, how does Gryff do with regards to performance? Based on the author’s evaluation, it is doing very well in reducing the (tail) latency of reads. However, I have to point out that the comparison with Multi-Paxos was flawed, at least to some extent. The authors always consider running a full Paxos phase for reads, and do not consider the possibility of reading from a lease-protected leader, eliminating 1 RTT from Paxos read. This will make Paxos minimum latency to be smaller than Gryff’s, while also dramatically reducing the tail latency. Gryff also struggles with write performance, because writes always take 2 RTTs in the ABD algorithm. As far as scalability, authors admit that it cannot push as many requests per second as EPaxos even in its most favorable configuration with just 3 nodes.

Can Paxos do better? We believe that our PQR optimization when applied in WAN will cut down most of the reads down to 1 quorum RTT, similar to Gryff. PQR, however, may still occasionally retry the reads if the size of a keyspace is small, however this problem also applies to Gryff when the cluster is larger than 3 nodes.

What about Casandra? Cassandra uses a protocol similar to ABD for its replication, and it also incorporates Paxos to perform compare-and-set transactions, which are one case of RMW operation, so in a sense Gryff appears to be very similar to what Cassandra has been doing for years.

# PigPaxos: continue devouring communication bottlenecks in distributed consensus.

This is a short follow-up to Murat’s PigPaxos post. I strongly recommend reading it first as it provides full context for what is to follow. And yes, it also includes the explanation of what pigs have to do with Paxos.

## Short Recap of PigPaxos.

In our recent SIGMOD paper we looked at the bottleneck of consensus-based replication protocols. One of the more obvious observations was that in protocols relying on a single “strong” leader, that leader is overwhelmed with managing all the communication. The goal of PigPaxos is to give the leader a bit more breathing room to do the job of leader, and not talking as much. To that order, we replaced the direct communication pattern between leader and followers with a two-hop pattern in which a leader talks to a small subset of randomly picked relay nodes, and the relays in turn communicate with the rest of the cluster. PigPaxos also uses relays to aggregate the replies together before returning to the leader. On each communication step, PigPaxos uses a new set of randomly picked relay nodes to both spread the load evenly among the followers and to tolerate failures.

By randomly rotating the relays and enforcing timeouts and including some other optimization on how many nodes to wait at each relay node, we can provide adequate performance even in the event of node crashes or network partitions. The fault tolerance limit of PigPaoxs is similar to Paxos, and up to a minority of nodes may fail with the system still making some (limited if implemented naively) progress.

## Some More Results

In the original PigPaxos post, we have not talked about scaling to super large clusters. Well, I still do not have that data available, but following the footsteps of our SIGMOD work, we have developed a performance model that, hopefully, is accurate enough to show some expected performance on the bigger scale.

On the fault tolerance front, relay nodes definitely introduce more ways for the protocol to stumble. Crash of a relay node makes the entire relay group unavailable for that communication attempt. Crash of a non-relay node causes timeout which may add to the operation latency. The core principle behind PigPaxos’ fault tolerance is to repeat failed communication in the new configuration of relay nodes. Eventually, the configuration will be favorable enough to make progress, given that the majority of nodes are up. However, this process can be slow when many nodes are crashed, so some orthogonal optimization can help. For example, it is worth remembering nodes temporarily down and not use these nodes for relays or otherwise expect them to reply on time. Another approach is to reduce the wait quorum of the relay group to tolerate strugglers, or even use overlapping groups for communication redundancy. However, even with all these ad-hoc optimizations turned off, PigPaxos can still mask failures originating in the minority of relay groups without much impact on performance. For example, in the experiment below we have one relay group experiencing a failure on every operation for 10 seconds without much detriment to overall performance.

## Why Scaling to This Many Nodes?

One of the most important questions about PigPaxos is “why?” Why do you need this many nodes in Paxos? Well, the answer is not simple and consists of multiple parts:

•         Because we can!
•         Because now we can tolerate more nodes crashing
•         Because now we can make services like ZooKeeper or even databases to scale for reads just by adding more nodes. ZooKeeper reads are from a single node. And so are many databases that provide some relaxed consistency guarantees.
•         Because it allows bigger apps with more parties that require consensus. And it is done by a single protocol.

# One Page Summary. Aegean: Replication beyond the client-server model

## One Page Summary.  Aegean: Replication beyond the client-server model

This paper builds o

n a key observation about the operation of complex distributed applications. Namely, microservice style of application rarely follows a simple client-server architecture, where a client makes a request and the server (or servers) respond to a request. Instead, many applications often use a nested approach, where clients communicate with some service, and the service itself acts as a client for one or more other nested services. This nesting often presents some challenges with traditional replication protocols, like primary-backup or Paxos-based RSM replication. For instance, when a service is replicated for durability, it makes it more difficult to preserve correctness of nested requests: in case of service failure, the information on whether the nested request was issued or returned may have been lost without some additional safeguards, making it difficult to track whether nested requests need to be reissued or otherwise recovered. Authors also claim that the existing approaches, like Paxos, suffer from performance penalty when dealing with nested calls, since the replicated service needs to block and wait for nested calls to resolve. I personally do not buy the latter issue too much, as many existing replication solutions, even Paxos-based, try to take advantage or parallelism whenever possible, by either using a pipelined approach or concurrently operating on independent requests or data in different conflict domains.

To counter the problems with nested request calls and responses, Aegean proposes to use a shim layer sitting next to each replicated service. When one service creates a nested request to the other service, it will talk to the shim layer instead of the nested service directly. The shim layer runs at each replica of a replicated service and collects the requests coming from the caller service (it assumes each replica of the caller will send a request). The shim passes the request to the nested service replica only upon collecting the majority requests from the caller, ensuring that the caller has sufficiently replicated the nested request. The replica can then process/replicate requests. Similarly, when the nested service generates the response, the shim layer broadcasts the response to all replicas of the caller service, keeping track of caller replicas receiving the responses and resending them as needed. Additionally, to ensure the response durability, every replica of the caller service sends the ack to other replicas, and only acts on the responses from a nested call when it itself receives a majority of such acks (including its own). This ensures that the responses to a nested call have been logged by at least a majority of replicas in a service. All these shim layers and ensuring response durability create a lot more message exchange in the system, which undoubtedly will impact the performance.

Another aspect of the paper deals with speculative execution of some requests, as these also introduce problems in the context of nested microservices, as speculative state may leak and get exposed to other services in the nesting chain. Aegean solves the problem of speculation by using barriers before the speculative state may become visible and resolves the speculation by reset and reply if replicas arrived at a different state.

To solve the performance issues with sequential Paxos, Aegean proposes to use pipelined approach, which is definitely not new. For example, our Paxi from a few years back is a pipelined implementation of many consensus protocols. Authors claim that Aegean has decent performance, although I find the evaluation a bit lacking. The main comparison is against sequential (not pipelined) Paxos, and Aegean is doing well in this setting. However, even authors admit that a large portion of the difference is due to the pipelining, raising the question of whether the performance comparison is fair in the first place.

Overall, I enjoyed the problems caused by replication in nested microservice architecture, but I am not sure I am too excited about the solution. The solution is a solid one for sure, but it appears very piecewise, with every piece specifically targeting a sub-problem, so it lacks certain elegance (which is not a bad thing at all for a solid practical approach to a problem). The evaluation is one part that raises the most questions for me, ranging from claims that non-byzantine tolerant Paxos and PBFT have similar throughput, to picking inherently weak baselines for evaluation, like non-pipelined Paxos.

# One Page Summary: Ring Paxos

This paper (Ring Paxos: A high-throughput atomic broadcast protocol) has been out for quite some time, but it addresses a problem still relevant in many distributed consensus protocols. Ring Paxos aims to reduce the communication load in the Paxos cluster and provide better scalability. As we have shown in our SIGMOD 2019 paper, communication is a great limiting factor in scalability of Paxos-like protocols.

Ring Paxos reduces communication overheads with a twofold approach. First, it uses ip-multicast to substitute direct node-to-node communication wherever possible with a broadcast type of communication. Second, Ring Paxos overlays a ring topology over (parts) of the Paxos cluster to control the message flow and prevent communication bottlenecks from forming.

Ring Paxos operates very similarly to regular Paxos, with differences being mainly in the communication part of the protocol. One node acts as a designated coordinator that receives proposals from the clients. However, the coordinator must confirm itself as a valid coordinator using the phase-1 of Paxos. To that order, the coordinator uses ip-multicast to send the message with some ballot/round number to all acceptors. This message also contains the ring configuration, including the node designated as the beginning of the ring. The acceptors receive the message and compare the ballot with their knowledge and only accept the node as new coordinator if the received ballot is the highest an acceptor has seen so far. Each acceptor is going to reply independently to the coordinator (not shown in the figure), and with these replies the coordinator will learn whether it has succeeded. Additionally, the coordinator also learns of any unfinished/uncommitted values that must be recovered.

Upon successfully getting a quorum of confirmations, the coordinator moves on to the phase-2 of Paxos, illustrated on the figure on the right. During this phase, the coordinator replicates the commands/log entries to the acceptors. Similarly to the phase-1, the coordinator uses ip-multicast to send this message to the acceptors (message #2 in the figure). The acceptors (and learners) get the value/command to be replicated, but that value is not committed just yet. Along with the value, acceptors also receive the value-id (c-vid), a unique identifier for the value, and they set their working value-id (v-vid) to c-vid. The acceptors do not reply individually to the coordinator. Instead, the first coordinator in the ring sends a reply, containing the c-vid to its successor (message #3 in the figure). The acceptor receiving such reply will compare its v-vid and the c-vid from the reply message. The two value-ids match when the acceptor is not aware of any other value/coordinator is acting concurrently. In this case it forwards the message further down the chain. This chain forwarding in the ring happens until the reply reaches the coordinator (message #4), which sits at the tail of the chain. The protocol terminates the message propagation across the ring when the acceptor’s v-vid and c-vid from the reply message are different, leaving the coordinator to wait for a timeout and retry the protocol from the beginning with a higher ballot. Node failures produce similar outcome, since a failure completely hinders message propagation across the ring. As a result, the new phase-1 of Paxos must include a different ring configuration to try avoiding the failed node. When the coordinator receives the reply from the ring/chain, it sends the commit message to all acceptors and learners with the ip-multicast (message #5). The protocol may be extended for running multiple slots in parallel by ensuring the c-vid and v-vid comparisons happen within the same slot.

The performance of Ring Paxos is better that standard Paxos implementations, such as Libpaxos (uses ip-multicast), and Paxos4sb (unicast). It appears that Ring Paxos is capable of saturating most of the network bandwidth, with only LCR protocol pushing a bit more throughput.

# Paper Summary: Bolt-On Global Consistency for the Cloud

This paper appeared in SOCC 2018, but caught my Paxos attention only recently. The premise of the paper is to provide strong consistency in a heterogeneous storage system spanning multiple cloud providers and storage platforms. Going across cloud providers is challenging, since storage services at different clouds cannot directly talk to each other and replicate the data with strong consistency. The benefits of spanning multiple clouds, however, may worth the hustle, since a heterogeneous system will be both better protected from cloud provider outages, and provide better performance by placing the data closer to the users. The latter aspect is emphasized in the paper, and as seen in the figure, going multi-cloud can reduce latency by up to ~25%.

To solve the issue of consistent cross-cloud replication, authors propose to use Cloud Paxos (CPaxos), a Paxos variant designed to work with followers supporting a very minimal and common set of operations: get and conditional put. In CPaxos, clients act as prospers, and storage systems serve the role of the followers. The followers are not really “smart” in this protocol, and most of the Paxos logic shifts to the client-proposers (Figure 2).

The prepare phase in CPaxos simply gathers the state from the followers, making the proposer decide for itself whether the followers would have accepted it with the current ballot or not. If the proposer thinks it would have been accepted, it will try updating the followers’ state. Doing this, however, requires some precautions from the followers, since their state may have changed after the proposer made a decision to proceed. For that matter, CPaxos uses conditional put (or compare-and-set) operation, making the followers update their state only if it has not changed since it was read by the proposer. This ensures that at most one proposer can succeed with changing the state of the majority of followers.

I visualize this as a log to represent changes in some object’s state. The new version of an object corresponds to a new slot in the log, while each slot can be tried with different ballot by different proposers. The put operation succeeds at the follower only if the value at the slot and a ballot has not been written by some other proposer. In case a proposer does not get a majority of successful updates, it needs to start from the beginning: increase its ballot, perform a read and make a decision whether to proceed with state update. Upon reaching the majority acks on state update, the proposer sends a message to flip the commit bit to make sure each follower knows the global state of the operation.

This basic protocol has quite a few problems with performance. Latency is large, since at least 2 round-trips are required to reach consensus, since every proposer needs to run 2 phases (+ send a commit message). Additionally, increasing the number of proposers acting on the same objects will lead to the growth in conflict, requiring repeated restarts and further increasing latency. CPaxos mitigates these problems to a degree. For example, it tries to commit values on the fast path by avoiding the prepare phase entirely and starting an accept phase on what it believes will be next version of an object with ballot #0. If the proposer’s knowledge of the object’s state (version, ballot) is outdated, the conditional put will fail and the proposer will try again, but this time with full two phases to learn the correct state first. However, if the proposer is lucky, an update can go in just one round-trip. This optimization, of course, works only when an object is rarely updated concurrently by multiple proposers; otherwise dueling leaders become a problem not only for progress, but for safety as well, since two proposers may write different values for the same version using the same ballot. This creates a bit of a conundrum on when the value becomes safely anchored and won’t ever get lost.

Consider an example in which two proposers write different values: green and blue to the same version using ballot #0 (Figure 4 on the left). One of the proposers is able to write to the majority, before it becomes unresponsive. At the same time, one green follower crashes as well, leading to a situation with two followers having green value and two being blue (Figure 5 on the right). The remaining proposer has no knowledge of whether the green or blue value needs to be recovered (remember, they are both on the same ballot in the same slot/version!). To avoid this situation, CPaxos expands the fast path commit quorum from majority to a supermajority, namely $$\left \lceil{\frac{3f}{2}}\right \rceil +1$$ followers, where $$2f+1$$ is the total number of followers, and f is the tolerated number of follower failures, allowing the anchored/committed value to be in a majority of any majority of followers . Having this creates an interesting misbalance in fault tolerance: while CPaxos still tolerates $$f[\latex] node failures and can make progress by degrading to full 2 phases of the protocol, it can lose an uncommitted value even if it was accepted by the majority when up to [latex]f$$ followers fail.

Proposer conflicts are a big problem for CPaxos, so naturally the protocol tries to mitigate it. The approach taken here reduces the duration in which possible conflicts may occur. As CPaxos is deployed over many datacenters, the latencies between datacenters are not likely to be uniform. This means, that a prepare or accept messages from some proposer reach different datacenters at different times, creating an inconsistent state. When two proposers operate concurrently, they are more likely so observe this inconsistency: as both proposers quickly update their neighboring datacenters, they run the risk of not reaching the required supermajority due to the conflicting state (Figure 6(a)) created by some messages being not as quick to reach remaining datacenters. To avoid rejecting both proposers, CPaxos schedules sending messages in a way to deliver them to all datacenters at roughly the same time. This reduces the duration of inconsistent state, allowing to order some concurrent operations (Figure 6(b)).

Despite the above mitigation strategy, conflicts still affect CPaxos greatly. The authors are rather open about this, and show their system CRIC with CPaxos degrading quicker than Paxos and Fast Paxos as the conflict rate increases. However, in the low conflict scenario, which authors argue is more likely in real world applications, CRIC and CPaxos improve on performance compared to Paxos/Fast Paxos, especially for reading the data. This is because reads in CPaxos are carried out in one round-trip-time (RTT) by client-proposer contacting all followers and waiting for at least a majority of them to reply. If the client sees the latest version with a commit flag set in the majority, it can return the data. Otherwise, it will wait to hear from more followers and use their logs to determine the safe value to return. In some rare cases when the proposer cannot determine the latest safe value, it will perform the recovery by running the write path of CPaxos with the value to recover (highest ballot value or highest frequency value if more than one value share the ballot).

Some Thoughts

• The motivation of the paper was to make strongly consistent system spanning multiple clouds providers and storage systems for the benefit of improved latency though leveraging the location of datacenters of these different providers. However, CRIC and CPaxos protocol requires a lot of communication, even on the read path. During reads, a client-proposer contacts all CPaxos nodes, located at all datacenters, and in best case still needs the majority replies. As such the latency benefit here comes from trying to get not just one node closer to the client, but a majority of nodes. This may be difficult to achieve in large systems spanning many datacenters. I think sharding the system and placing it on subset of nodes based on access locality can benefit here greatly. For instance, Facebook’s Akkio paper claims to have significant reduction in traffic and storage by having fewer replicas and making data follow access patterns. In our recent paper, we have also illustrated a few very simple data migration policies and possible latency improvement from implementing these policies.
• One RTT reads in “happy path” can be implemented on top of regular MutliPaxos without contacting all nodes in the systems. Reading from the majority of followers is good enough for this most of the time, while in rare circumstances the reader may need to retry the read from any one node. More on this will be in our upcoming HotStorage ’19 paper.
• The optimization to delay message sending in order to deliver messages at roughly the same time to all nodes can help with conflict reduction in other protocols that suffer from this problem. EPaxos comes to mind right away, as it is affected by the “dueling leaders” problem as well. Actually, CPaxos and EPaxos are rather similar. Both assume low conflict rate to have single round trip “happy path” writes and reads. When the assumption breaks, and there is a conflict, both switch to two phases. EPaxos is better here in a sense that the first opportunistic phase is not totally wasted and can be used as phase-1 in the two phase mode, whereas CPaxos has to start all the way from the beginning due to the API limitation on the follower side.

# Keep The Data Where You Use It

As trivial as it sounds, but keeping the data close to where it is consumed can drastically improve the performance of the large globe-spanning cloud applications, such as social networks, ecommerce and IoT. These applications rely on some database systems to make sure that all the data can be accessed quickly. The de facto method of keeping the data close to the users is full replication. Many fully replicated systems, however, still have a single region responsible for orchestrating the writes, making the data available locally only for reads and not the updates.

Moreover, full replication becomes rather challenging when strong consistency is desired, since the cost of synchronizing all database replicas on the global scale is large. Many strongly consistent datastores resort to partial replication across a handful of nearby regions to keep the replication latency low. This creates situations when some clients close to the regions in which data is replicated may experience much better access latency than someone reaching out from the other side of the globe.

Despite the obvious benefits of adapting to locality changes, many databases offer only static partitioning. Of course, some data stores have the migration capability, but still often lack the mechanisms to determine where the data must be moved. Quite a few orthogonal solutions provide capabilities to collocate related data close together or use days or weeks’ worth of logs to compute better data placement offline. Meanwhile, Facebook’s aggressive data migration helps them reduce the access latency by 50% and save on both storage and networking.

We (@AlekseyCharapko, @AAilijiang and @MuratDemirbas) investigated the criteria for quick, live data migration in response to changes on access locality. We posit that effective data-migration policies should:

• Minimize access latency,
• Preserve load balancing with regards to data storage and processing capacity
• Preserve collocation of related data, and
• Minimize the number of data migrations.

## Policies

We developed four simple migration policies aimed at optimizing the data placement. Our policies operate at an arbitrary data-granularity, be it an individual key-value pairs, micro-shards, or the partitions. For simplicity, I say that policies work on objects that represent some data of an arbitrary granularity.

The main point we address with the policies is minimizing access locality, with each policy using a different heuristic to make a data-placement decision. Once the policy finds the most optimal location for an object, it checks the load balancing constraints to adjust the data migration decision as required.

Our simplest policy, the n-consecutive accesses policy, uses a threshold of consecutive accesses to the object to make the placement decision. Although simple, this policy works well for workloads with strong locality in a single region. Majority accesses policy keeps track of some request statistics and uses it to find the region with the most accesses to an object over some time interval. It then migrates the data over to that region.

The exponential moving average (EMA) policy takes a different approach and computes the average region for all requests to the object. The average region is computed as an exponential moving average favoring the most recent requests. This policy can potentially find better placement for objects that have more than one high-access region. However, it requires the regions to have numerical IDs arranged in the order of region’s proximity to each other. This policy falters for deployments with complicated geography and may require multiple migrations to move data to the best location. Another disadvantage of EMA is that it takes longer to settle and requires many data migrations. Unlike other policies that can move the data directly to the desired region, EMA can only migrate objects to one of the neighboring regions, making adjustment such as going from region (1) to (3) include a temporary migration to region (2).

Finally, the center-of-gravity (CoG) policy calculates the optimal object placement by taking into account the distribution of all requests to an object and the distances between the datacenters. CoG policy calculates the region closest to the central location for any access locality workloads. CoG can collect the request statistics similar to the majority accesses policy and make a placement decision only after some time has elapsed since last decision. Alternatively, it can use a continuous metric to assign each region a score corresponding to its weight in the workload, adjust the score and recompute the best object placement with every request.

## Some Evaluation

I’ve simulated protocols under different access locality scenarios and calculated the latency of inter-region access and the number of object movements each policy makes. In the simulations, I used 3000 distinct objects, initially assigned to a random region in the cluster of 15 regions. I used the AWS inter-region latencies to specify the distances between simulated regions.  To my surprise, even the most basic policies showed good improvement over static random placement of data.

In the first experiment, the objects were accessed according to a normal distribution. Each object has a ID assigned to it, and some Normal distribution dictates the probability of the drawing the ID each region. All regions have distributions with the same variance, but different means, making each region predominantly accessing some of the objects, and having some group of objects being more-or-less shared across the regions with adjacent IDs.

In this experiment, both CoG and majority accesses policy showed the best results in terms of latency and the number of object movements. This is because the workload almost always favors a single region, and in rarer cases shares the object between two regions. This makes majority heuristic that only considers one region work well. Similarly, 3-consecutive accesses policy shows good latency, but it generates a lot of jitter constantly moving shared objects between neighboring regions.

When the workload is no longer predominantly single region dominant for every key, single-region heuristic policies perform worse. For instance, equally sharing an object between utmost 3 regions out of 15 causes majority and 3-consecutive accesses policies to lock in to one of the sharing regions instead of optimizing the latency for all sharing regions. CoG policy can place the data in a region optimal for all 3 regions (and not even necessarily in one of the sharing regions) and optimize the latency better than a single-region heuristic, topology unaware policies. EMA policy is at a big disadvantage here, since it relies on ID assignments to dictate the proximity of regions. However, the complex geography of AWS datacenters makes a good ID assignment nearly impossible, causing EMA to sometimes overshoot the best region and settle in less optimal one.

Previous experiments did not consider the effect of load balancing. However, a good data-migration policy should refrain from migrating data to overloaded regions. In the next experiment I applied load-balancing filter to the CoG policy to make the migration procedure first compute the best region for the object, check if that region has the capacity, and if no capacity is available, move the data to the next best region with enough processing/storage capacity. Here I used 5 regions and 1000 objects, and limited each region to storing at most 25% of all data. I ran a heavily skewed workload with 80% of all requests coming from a single region. Under these conditions the CoG policy achieves very low average latency. However, as evidenced by the disbalance graph, all objects migrate over to a single region.  If load balancing is enabled, no region becomes overloaded, but latency improvement becomes more modest.

## Concluding Remarks

Having data close to the consumers can dramatically improve the access latency. For some databases, this means doing full replication, for other this may involve moving data or the owner/write role from one region to another. It is important to make sure the data is moved to a right location. I have looked at four simple rules or policies for determining the data migration and ran some simulations on these.

There are a few lessons I have learned so far from this:

• Topology aware rules/polices work better for a larger variety of situations
• Simple rules, such as just looking a number of consecutive requests coming from a region or determining the majority accesses region can also work surprisingly well, but not always. These tend to break when access locality is not concentrated in a single region, but shared across a few regions in the cluster
• EMA looked interesting on paper. It allowed to have just a single number updated with every request to determine the optimal data placement, but it performed rather bad in most experiments. The main reason for this is complicated geography of datacenters.
• Optimizing for latency and adjusting for load balancing constraints to prevent region overload can be done in two separate steps. My simple two-stage policy (presently) looks at load balancing for each object separately. This becomes a first-come-first-serve system, but I am not sure yet whether this can become a problem.
• EMA policy takes multiple hops to move data to better region, while n-consecutive accesses policy has constant jitter for objects shared by some regions

I have not studied much about data-collocation in my experiments, nor designed the policies to take this into consideration. One of the reasons is that I think related objects will have similar access locality, causing them to migrate to same datacenters. However, this is just a guess, and I need to investigate this further.

# Looking at State and Operational Consistency

Recently I rediscovered the “The many faces of consistency” paper by Marcos Aguilera and Doug Terry. When I first read the paper two years ago, I largely dismissed it as trivial, and, oh boy, now I realized how wrong I was at that time.  It is easy to read for sure, and may appear as some summary of various consistency models at first, but it is thought provoking and really makes you ask more questions and draw interesting parallels after giving it some quality time.

Murat gave a good summary of this paper recently in relation to his sabbatical. The questions he asks after the summary, however, provoke even more thoughts about consistency and how we classify, categorize and view it.

In a nutshell, the paper talks about consistency from different perspectives, namely state consistency as observed by a system itself and operational consistency that clients see. State consistency involves enforcing a system-state to hold some invariants. State consistency promotes invariant-based reasoning. The operational consistency is different, as it looks at the system from the client point of view. Outside clients do not directly observe the state of the system, instead they perform operations against it and can only see the results of these operations. So in short, state consistency is invariant-based and concerns with the internal state of the systems. Operational consistency deals with what clients observe from the outside of the system. These operational consistencies include various types sequential equivalence, such as linearizability and serializability, and other client-centric guarantees, like read-your-write or bounded-staleness.

For more details, read the original paper, Murat’s summary or one from the morning paper.

## “Strength” of state consistency.

What strikes me right away is how different the two types of consistency are described. Operational consistency gives us the framework for reasoning about systems without having too many details about the internals. We can gauge the relative “strength” of different consistency models and put them in perspective against each other. We know the serializability is weaker then session serializability. Or that linearizability is stronger than sequential equivalence.

But what about state consistency? There is no such reasoning framework. And in fact, it is not easy to even classify state consistency, yet along reason about the “strength” of different state consistency classes. The paper mentions a few state consistency models, such referential integrity for databases, or mutual consistency in primary-backup systems, or error bounds. But these are not generally applicable across the board. These examples of state consistency operate within the constraints of their specific domains or applications.

However, state consistency still comes at different “strength” levels. When reasoning about state consistency, we use invariant-based approach. The invariants on the state we need to enforce for an eventually consistent data-store and a strongly-consistent one are different. In the former case, we can be more relaxed, since we only need to make sure that at least one future state will have different nodes of the store to have the same data. The latter case is more complicated, as we need to hold stricter invariants (i.e. no two alive nodes have different committed value for the same slot in the log, and there can be only one active leader, and the leader must process the commands in the receive order, etc.).

It is the “tightness” of the invariants that makes state consistency strong or weak. But deciding which invariant is tighter or stricter is hard. We cannot simply compare various invariants on the merits of when they should hold, or how many parameters they cover or how many nodes they span, as all these (and other) metrics mean something only in the context of their systems/problems. Invariant that must hold at every state is not necessarily tighter than the eventual one, as it may simply be an invariant against some irrelevant or trivial parameter that holds all the time anyway and has no impact on the system.

And this is why we often translate these invariants and state consistency they represent into the operational consistency. The operational side of things allows us to observe the impact of the invariants on the system, albeit indirectly. Operational consistency levels the playing field and enables the comparison from the external point of view. It allows us to gauge how otherwise hard-to-compare invariants at the state-consistency level affect the outcome of operations.

## Smart systems or smart clients

Does this mean that a system providing stronger operational consistency has stronger state consistency? Well, it would have been too simple if that was the case. It often happens, and more so recently, that systems have “misalignments” between their internal state consistency and the operational one exposed on the client side.

A system that provides stronger operational semantics may do so because it has a strong state that makes it easy to expose the strong operational consistency. These smart systems preserve strong state-consistency at all costs. They may need to run complicated algorithms (i.e. Paxos) to achieve that, but doing so makes the clients lean and simple with minimal or no state at all.

On the other hand, simple systems may forgo the complicated protocols needed to enforce a strong state. Instead they aim to run as lean as possible at the core system level and shift as much burden to the smart clients. These clients need to have more complex state and protocols if they are to provide stronger operational guarantees. There are systems that do exactly that.

Both “smart system – lean client” and “lean system – smart client” approaches have their advantages and drawbacks. Designing and maintaining a smart system may actually be simpler: all the things engineers need are readily available at the system level. Invariant-based reasoning and tools like TLA easily apply in this setting. Debugging is simpler too, since lean and stateless clients are likely not the cause of a problem, and internal logging can help collect all the necessary information. On the other hand, having a lean system may improve the performance by reducing the bottlenecks, spreading load more evenly across the nodes and even sharing the load with smart clients. But it comes at some engineering costs. Protocols now involve more state and state is even more distributed: both at the lean system nodes and smart clients. Modeling this state with TLA is still possible, but it will likely take more time to check and require a more complicated models that include clients and client interactions with the system nodes. Debugging may be slowed down due to the lack of necessary data, since many issues (especially on production) may happen at smart clients outside of the engineers’ reach.

State consistency is an interesting beast. It does not give us the same mental reasoning framework as operational consistency. We, the distributed systems people, often think about the consistency in operational terms. It is easy to understand why, since operational consistency allows for comparison between systems or protocols. But then we, the distributed systems people, also think in terms of state consistency. We model our systems at the state level, trying to give good invariants, try to see what states should always hold, or how a system needs to converge to certain states.

But now understand that there is no clear path from strong state to strong operational consistency. Strong state makes it easier to build operationally strong systems, but it is not a requirement. In fact, for example ZooKeeper, despite having a Paxos-like protocol at its heart is not all that operationally strong. And some systems, like TAPIR or OCCULT, may have weaker state, but with clever engineering and smart clients can provide stronger operational semantics. The world of distributed systems is not black-and-white. There are lots of gray in between.

# Python, Numpy and a Programmer Error: Story of a Bizarre Bug

While recently working on my performance analysis for Paxos-style protocols, I uncovered some weird quirks about python and numpy. Ultimately, the problem was with my code, however the symptoms of the issue looked extremely bizarre at first.

Modeling WPaxos required doing a series of computations with numpy. In each step, I used numpy to do some computations with arrays. Normally, I would initiate a new array and set the values by doing some calculations on the data from previous steps. However, in one step I used newly initialized array to perform some additions with another numpy array. Of course, by mistake I initialized a new array with numpy.empty() instead of numpy.zeroes(), causing the new array to potentially contain some garbage values that may screw up the entire computation. Obviously, I did not know I made this mistake.

However, most of the times, this new array had all values set to zero, so I consistently observed the correct results. That is until I added a simple print statement (something like print some_array) on some array to check on the intermediate computation in the model. Printing the numpy array caused the entire calculation to screw up, leaving me with a big mystery: how a simple python print statement, that should have no side-effects, change the results of subsequent computations?

I wouldn’t lie, I was mesmerized by this for hours: I remove the print statement and everything works, I add it back and the entire model breaks. Consistently. Even after a reboot. What is even more weird, the bad results I observed were consistently the same, reproducible run after run after run.

And in such consistent failures, I observed a pattern. One computation step was always skewed by the same value, the value of the array I was printing, as if that array was added to the newly initialized array in the skewed step. And this is when I noticed that I use numpy.empty() instead of numpy.zeroes(). A simple fix and the outcome was the same, regardless of whether I print results of the intermediate steps or not.

In the end, it was a programmer’s error, but the bizarre symptoms kept me away from the solution for way too long before uncovering the truth. I am not an expert on the internals of python and numpy, but I do have some clue as of what might have happened. I think, the print statement created some kind of temporary array, and this temporary array got destroyed after the print. (Alternatively, something else created a temporary array, and print statement just shifted the address of memory allocations). Next computation then allocated space for new array, having the same dimensions, in the exact same spot of the old temporary one. And this newly created array then had garbage values, containing the outputs of the previous step.

The interesting part is how consistent this was for hundreds of tries, producing the same failed output. How consistent the memory allocations had to be in every run? And of course, many may not even think about the possibility of having such dirty memory problems in languages such as python. Numpy, however, is written in C, and it clearly brings some of the C’s quirks to python with it, so read the documentation.

# One Page Summary: “PaxosStore: High-availability Storage Made Practical in WeChat”

PaxosStore paper, published in VLDB 2017, describes the large scale, multi-datacenter storage system used in WeChat. As the name may suggest, it uses Paxos to provide storage consistency. The system claims to provide storage for many components of the WeChat application, with 1.5TB of traffic per day and tens of thousands of queries per second during the peak hours.

PaxosStore relies on Paxos protocol to for consistency and replication within tight geographical regions. The system was designed with a great separation of concerns in mind. At a high level, it has three distinct layers interacting with each other: API layer, consensus layer, and storage.  Separating these out allowed PaxosStore provide most suitable APIs and storage for different tasks and application, while still having the same Paxos-backed consistency and replication.

In a paxos-driven consensus layer,  the system uses a per-object log to keep track of values and paxos-related metadata, such as promise (epoch) and proposal (slot) numbers. Log’s implementation, however, seems to be somewhat decoupled from the core Paxos protocol. Paxos implementation is leaderless, meaning there are no single dedicated leader for each object, and every node can perform writes on any of the objects in the cluster by running prepare and accept phases of Paxos. Naturally, the system tries to perform (most) writes in one round trip instead of two by assuming some write locality. After the first successful write, a node can issue more writes with increasing proposal (slot) numbers. If some other node performs a write, it needs to have higher ballot, preventing the old master from doing quick writes. This is a rather common approach, used in many Paxos variants.