Tag Archives: geo-replication

Reading Group. UniStore: A fault-tolerant marriage of causal and strong consistency

For the 80th paper in the reading group, we picked “UniStore: A fault-tolerant marriage of causal and strong consistency” by Manuel Bravo, Alexey Gotsman, Borja de Régil, and Hengfeng Wei. This ATC’21 paper adapts the Partial Order-Restrictions consistency (PoR) into a transactional model. UniStore uses PoR to reduce coordination efforts and execute as many transactions as possible under the causal consistency model while resorting to strong consistency in cases that require ordering concurrent conflicting transactions. The PoR consistency itself is an extension of RedBlue consistency that allows mixing eventually consistent and strongly consistent operations. 

UniStore operates in a geo-replicated model, where each region/data center (I use region and data center interchangeably in this post) stores the entirety of the database. The regions are complete replicas of each other. Naturally, requiring strong consistency is expensive due to cross-region synchronization. Instead, UniStore allows the developers to choose running transactions as causally or strongly consistent. Causal consistency preserves the cause-and-effect notion between events — if some event e1 has resulted in event e2, then an observer seeing e2 must see the e1 as well. Naturally, if two events are concurrent, then they are not causally dependent. This independence gives us the freedom to apply these events (i.e., execute against the store) in any order. For instance, this approach provides good performance for events that may touch disjoint data concurrently. However, if these two events are concurrent and operate on the same data, then the events either need to be commutative or be partly ordered. UniStore does both of these — it implements CRDTs to ensure commutativity, but it also allows to declare a transaction as strongly consistent for cases that require ordering. As such, strong consistency becomes handy when the commutativity alone is not sufficient for the safety of the application. For example, when a transaction does a compare-and-set operation, the system must ensure that all replicas execute these compare-and-set operations in the same order.

So, in short, causal consistency is great when an application can execute complex logic in causal steps — event e1 completes, then observing the results of e1 can cause some other change e2, and so on. Strong consistency comes in handy when step-by-step non-atomic logic is not an option, and there is a need to ensure the execution order of conflicting concurrent operations. In both cases, the systems need to keep track of dependencies. For causal transactions, the dependencies are other transactions that have already finished and were made visible. For strongly consistent transactions, the dependencies also include other concurrent, conflicting, strongly consistent transactions.  

So, how does UniStore work? I actually do not want to get into the details too deep. It is a complicated paper (maybe unnecessarily complicated!), and I am not that smart to understand all of it. But I will try to get the gist of it. 

The system more-or-less runs a two-phase commit protocol with optimistic concurrency control. Causal transactions commit in the local data center before returning to the client. However, these causal transactions are not visible to other transactions (and hence other clients/users) just yet. Remember, this causal transaction has not been replicated to other data centers yet, and a single region failure can cause some problems. In fact, if a strongly consistent transaction somehow takes such a non-fully replicated causal transaction as a dependency, then the whole system can get stuck if the dependent causal transaction gets lost due to some minority regions failing. 

UniStore avoids these issues by making sure the causal transactions are replicated to enough regions before these causal transactions are made visible. This replication happens asynchronously in the background, sparing the cost of synchronization for non-strongly consistent transactions (that is, of course, if clients/users are ok with a remote possibility of losing transactions they thought were committed).

Strongly consistent transactions are a different beast. They still optimistically run in their local data centers but no longer commit in one region to ensure the ordering between other strongly consistent transactions. UniStore uses a two-phase commit here as well, but this time the commitment protocol goes across all healthy regions. First, the coordinator waits for enough data centers to be sufficiently up-to-date. This waiting is crucial for liveness; it ensures that no dependent transaction may get forgotten in the case of data center outages. After the waiting, the actual two-phase commit begins, with all (alive?) regions certifying the transaction. 

To implement this waiting and only expose the durable and geo-replicated state to transactions, UniStore has a complicated system of version tracking using a bunch of vector clocks and version vectors. Each of these vectors has a time component for each data center and an additional “strong” counter for keeping track of strongly consistent transactions. Each transaction has a couple of important version vectors. 

The snapshot vector snapVec describes the consistent snapshot against which the transaction runs. The commit vector commitVec tells the commit version of a transaction used for ordering. 

Each replica keeps two different version vectors representing the version of the most recent transaction known to itself and its data center. Since the system relies on FIFO order of communication and message handling, knowing the version of the most recent transaction implies the knowledge of all lower-versioned transactions as well. This information is then exchanged between data centers to compute yet another version vector to represent the latest transaction replicated to at least some majority of data centers. This, again, implies that all lower-versioned transactions have been replicated as well. This last version vector allows strongly consistent transactions to wait for their dependencies to become globally durable to ensure liveness. 

So here is where I lose understanding of the paper, so read on with a pinch of salt, as my skepticism may be completely unwarranted. It makes sense to me to use version vectors to keep track of progress and order causal transactions. Each region computes the region’s known progress, exchanges it with other regions, and calculates the global “transaction frontier” — all transactions that have been replicated to a sufficient number of data centers. This exchange of known progress between regions happens asynchronously. I am not entirely sure how these progress vectors help with ordering the conflicting transactions. Somehow the “strong” counter should help, but this counter seems to be based on the regions’ knowledge of progress and not the global one. I suspect that these vectors help identify the concurrent conflicting transactions. The progress known in the data center ends up in a snapVec and represents the snapshot on which the transaction operates. The strongly consistent transactions use a certification procedure (i.e., a two-phase commit) to decide whether to abort or commit. The paper mentions that the certification process assigns the commitVec, which actually prescribes the order. At this point, I hope that conflicting transactions are caught in this Paxos-based transaction certification procedure and ordered there or at least aborted as needed. Also worth mentioning that the extended technical report may have more details that I was lazy to follow through.

Now a few words about the evaluation. The authors focus on comparing UniStore against both casual and strongly consistent data stores. Naturally, it sits somewhere in the middle of these two extremes. My bigger concern with their implementation is how well it scales with the number of partitions and number of data centers. The paper provides both of these evaluations, but not nearly to the convincing scale. They go up to 5 data centers and up to 64 partitions. See, with all the vectors and tables of vectors whose size depends on the number of regions, UniStore may have some issues growing to “cloud-scale,” so it would be nice to see how it does at 10 data centers or even 20. Cloud vendors have many regions and even more data centers; Azure, for example, has 60+ regions with multiple availability zones.

Our groups persentation by Rohan Puri is on YouTube:

Discussion

1) Novelty. So the paper works with a rather interesting consistency model that combines weaker consistency with strong on a declarative per-operation basis. This model, of course, is not new, and the paper describes and even compares against some predecessors. So we had the question about the novelty of UniStore since it is not the first one to do this kind of consistency mix-and-match. It appears that the transactional nature of UniStore is what separates it from other such solutions. In fact, the bulk of the paper talks about the transactions and ensuring liveness in the face of data center outages, so this is nice. Many real-world databases are transactional, and having a system like this is a step closer to a practical solution.  

2) Liveness in the presence of data center failures. Quite a lot of the paper’s motivation goes around the inability to simply run OCC + 2PC in the PoR consistency model and maintain liveness. One problem occurs when a causal transaction takes a dependency on another transaction that did not make it to the majority of regions. If such a transaction is lost, it may stall the system. Of course, any region that takes a dependency on some transaction must see it first. Anyway, it is hard to see the novelty in “transaction forwarding” when pretty much any system recovers the partly replicated data by “forwarding” it from the nodes that have it. 

However, the bigger motivational issue is with strongly consistent transactions. See, the authors say that the system may lose liveness when a strongly consistent transaction commits with a dependency on a causal one that has not been sufficiently replicated, and that causal transaction gets lost in a void due to a region failure. However, to me, this seems paradoxical — how can all (healthy) regions accept a transaction without having all the dependencies first? It seems like a proper implementation of the commit protocol will abort when some parties cannot process the transaction due to the lack of dependencies. Anyway, this whole liveness thing is not real and appears to be just a way to make the problem look more serious. 

That said, I do think there is a major problem to be solved. Doing this more proper commit protocol may hurt the performance by either having a higher abort rate or replicating dependencies along with the strongly consistent transaction. We’d like to see how much better UniStore is compared to the simpler 2PC-based solution that actually aborts transactions when it cannot run them. 

3) Evaluation. The evaluation largely compares against itself, but in different modes — causal and strong. This is ok, but how about some other competition? Take a handful of other transactional approaches and see what happens? The current evaluation tells us that the PoR model provides better performance than strong and worse than causal consistency. But this was known before. Now we also know that this same behavior translates to a transactional world. But we do not know how the cost of the protocol fares against other transactional systems that do not have PoR and are not based on UniStore with features disabled.

Also interesting to see is how expressive the transactional PoR consistency model is. For example, let’s take MongoDB. It can be strongly consistent within a partition and causal across the partitions (and within the partition, users can manipulate the read and write consistency on a per-operation basis). What kind of applications can we have with Mongo’s simpler model? And what kind of apps need UniStore’s model with on-demand strong consistency across the partitions?

4) Complicated solution??? I have already mentioned this in the summary, but UniStore is complicated and relies on many moving parts. The paper completely omits the within-datacenter replication for “simplicity,” but it does not really make the paper simple. We have vectors that track progress, order, and snapshots, and then we have tables of vectors, and all these multiple kinds of vectors are exchanged back-and-forth to compute more vectors only to find out when it is ok to make some transactions visible or unblock some strongly consistent transactions. How come other systems (MongoDB again?) implement causal consistency with just one number for versioning (HLC) and still allow to specify stronger guarantees when needed? Yeah, they may not implement the PoR consistency model but it just seems too complicated. As a side question… what happens in Mongo when we start changing consistency between causal and strong on a per-request basis?

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Fault-Tolerant Replication with Pull-Based Consensus in MongoDB

In the last reading group meeting, we discussed MongoDB‘s replication protocol, as described in the “Fault-Tolerant Replication with Pull-Based Consensus in MongoDB” NSDI’21 paper. Our reading group has a few regular members from MongoDB, and this time around, Siyuan Zhou, one of the paper authors, attended the discussion, so we had a perfect opportunity to ask questions and get answers from people directly involved.

Historically, MongoDB had a primary backup replication scheme, which worked fine most of the time, but had a few unresolved corner-cases, such as having more than one active primary at a time due to the failures and primary churn. Having a split-brain in systems is never a good idea, so engineers looked for a better solution. Luckily, there are many solutions for this problem — an entire family of consensus-based replication protocols, like Multi-Paxos and Raft. MongoDB adopted the latter protocol to drive MongoDB’s update replication scheme. 

At this time, I might have finished the summary, congratulated MongoDB on using Raft, and moved on. After all, Raft is a very well-known protocol, and using it poses very few surprises. However, Mongo DB had a couple of twists in their requirements that prevent using vanilla Raft. See, the original MongoDB primary backup scheme had a pull-based replication approach. Usually, the primary node (i.e., the leader) is active and directly prescribes operations/commands to the followers. This active-leader replication happens in traditional primary backup, in Multi-Paxos, Raft, and pretty much any other replication protocol aside from pub-sub systems that rely on subscribers pulling the data. In MongoDB, the primary is passive, and followers must request the data at some repeated intervals. In the figure below, I illustrate the difference between the push-based (active leader) and pull-based (active follower) approaches.  

In both cases, the followers can learn new data from the leader. The pull-based approach, however, requires a bit more work. The leader has to compute the data batch for each follower individually, because followers may all have slightly different progress due to the differences in polling schedule. Also, there is another problem with the pull solution: the primary has no way of knowing whether the followers have received the operations during the pull. And this is important since the primary needs to count replicas accepting each operation to figure out the quorum and ultimately commit the operations. In traditional Raft, the result of AppendEntry RPC carries an ack for a successful operation, allowing the primary to count successful nodes and figure out when the quorum is satisfied. With flipped pull-style communication, there is no reply to the primary, and so we need to add some additional mechanism to propagate this information, as shown in the figure here. Adding more communication is costly, but the extra message can be piggybacked into the consecutive pull request at the expense of latency.

Pull-based Raft. Note that the leader must know when a quorum has accepted an operation.

Now let’s diverge a bit and discuss why MongoDB picked a pull-based approach despite added complexity/cost. The traditional Pull-based solutions are centered around the primary or leader node, creating a star topology. In fact, unless you use something like Pig primitive from our PigPaxos paper, star topology is pretty much all you can have. This is not very good in some scenarios where links may be congested or cost a lot of money. Consider a WAN scenario where two nodes may exist in another region for geo-redundancy to minimize data loss in the event of regional failure. With star topology, the same payload will travel across WAN twice to each of the two replicas, consequently costing twice as much to replicate. With a pull-based approach, it is easier to orchestrate other replication topologies. All we have to do is to instruct one node to pull data from its regional peer instead of the leader across WAN. Naturally, this creates a longer replication chain but saves the WAN link bandwidth, cost, and potentially some resources at the primary.

MongoDB’s pull-based solution enables such chaining by allowing some non-primary nodes to act as sync sources. Sync source is a node that provides data to syncing servers in response to a pull request. Of course, the primary is always a sync source. The high-level overview of node interactions is a follow:

  • Syncing server asks for new log entries from its sync source.
  • Sync source replies with a sequence of log entries.
  • Syncing server appends the log items if items follow the same history. 
  • If source and server logs diverge, the syncing server may have uncommitted data and must clean the divergent suffix. The detection of log divergence is based on matching the last log entry of syncing server with the first entry of the log segment sent by the source. If they match, then the source has sent a continuation of the same history.
  • Syncing server notifies its sync source of how up-to-date it is, including syncing server’s term.

Unlike vanilla raft, the syncing server does not check the term of the source, so it may be possible for a server to accept log entries from a source in some previous term. Of course, this can only happen if the log entries follow the same history. I think MongoDB Raft does so to make sure that the syncing server learns about legit updates from the previous leader even if it has already participated in the new leader’s election round. What is important here, is that the syncing server sends its higher term back to the source (which should propagate to the source’s source, etc, until it reaches the leader for the source’s term). These term messages act as a rejection for the old leader, so it won’t count the syncing server as accepting the message and being a part of the quorum. As a result, if the data from the old-termed sync source was committed, the syncing server has received it and will eventually receive the commit notification from the new leader. If that data is uncommitted by the old leader (i.e., a split-brain situation), then no harm is done since the syncing server does not contribute to the quorum. The syncing server will eventually learn of proper operations instead.

Now speaking of performance, the paper does not provide any comparison between push- and pull-based solutions, so we are left wondering about the impact of such drastic change. However, some experiments illustrate the comparison of star topology and chained replication in a 2-regions WAN setup. While chaining does not seem to increase the performance (unless the cross-region bandwidth is severely restricted), it predictably lowers the WAN bandwidth requirements. As far as maximum performance, the paper mentions that the limiting factor is handling client requests and not replication, and this is why one fewer server pulling from the leader does not impact throughput. I am not sure I am very comfortable with this explanation, to be honest. 

The paper talks about a handful of other optimizations. I will mention just one that seemed the most interesting — speculative execution. With speculative execution, the nodes do not wait for a commitment notification to apply an operation, and speculatively apply it to the store right away. Since the underlying storage engine is multi-version, the system can still return strongly consistent reads by keeping track of the latest committed version. The multi-version store also allows rollbacks in case some operation fails to commit.

You can see my presentation of the paper on YouTube:

Discussion

1) Cost of pull-based replication. The performance cost of the pull-based solutions was the first big discussion topic that I raised in my presentation. As I mentioned, there is an extra communication step needed to allow the primary to learn of quorum acceptance. This step either adds additional message round/RPC call or adds latency if piggybacked to the next pull request. Another concern is pulling data when there is no new data, as this will be wasted communication cycles. 

Luckily, we had MongoDB people, including Siyuan Zhou, one of the paper authors, to shed some light on this. To make things a little better and not waste the communication cycles, the pulls have a rather long “shelf life” — if the source has no data to ship, it will hold on to the pull request for up to 5 seconds before replying with a blank batch. 

Another big optimization in the MongoDB system is a gradual shift to the push-based style! This somewhat makes the entire premise of the paper obsolete, however, this new “push-style” replication is still initiated by the syncing server with a pull, but after the initial pull, the source can push the data to the syncing server as it becomes available. So this allows building these complicated replication topologies while reducing the latency impact of a purely pull-based solution.

Another aspect of cost is the monetary cost, and this is where chained replication topologies can help a lot. Apparently, this was a big ask from clients initially and shaped a lot of the system architecture. 

2) Evolution vs Revolution. So, since the original unproven replication approach was pull-based to allow chained topologies, the new improved and model-checked solution had to evolve from the original replication. One might think that it would have been easier to slap a regular push-based Raft, but that would have been a drastic change for all other components (not to mention the features). This would have required a lot more engineering effort than trying to reuse as much of the existing code as possible. This brings an interesting point on how production software gradually evolves and almost never drastically revolves.

3) Evaluation. The evaluation is the biggest problem of the paper. It lacks any comparison with other replication approaches except old MongoDB’s primary backup scheme. This makes it hard to judge the impacts of the changes. Of course, as we have seen from the discussion with the authors, the actual implementation is even more complicated and evolved. It tries to bridge the gap between pull and push replication styles, so a clear comparison based on MongoDB’s implementation may not have been possible at all. 

That being said, I had some questions even about the provided self-comparisons. See, I would have expected to see a bit of throughput increase from chaining, similar to what I observe in PigPaxos, but there is none. The paper explains it by saying that replication at the sync source takes only 5% of a single core per syncing server, which would amount to just 20% of a core in star topology leader. Roughly, given the VMs used, this is around 5% of the total CPU used on replication, with the paper claiming that all remaining CPU is used to handle client requests. Assuming there is sync every 2 ms, we have about 2000 syncs per second at the leader for 12000 client requests. Doing some napkin math, we can see that there are 6 times more requests than replications per second, yet requests use 19 times the CPU, making each request roughly 3 times more expensive than each replication. Given that replication messages are not super cheap and require querying the log, and serializing the data, the performance cost difference sounds excessive to me, despite considering that client requests have a few writes in them (i.e., write to the log, and operation execution).

Siyuan explained that there is a bit more work on the request path as well. Not only the log is written (and persisted for durability), but there are also some additional writes for session maintenance and indexes. Moreover, the writes in the underlying WiredTiger engine are transactional, costing some performance as well. 

4) PigPaxos? Throwing this out here, but there are push-based solutions that can have more interesting replication topologies than a star. Our PigPaxos, in fact, solves a similar issue of cross-regional replication at a low network cost. 

5) Other pull-based systems. Finally, we try to look at other solutions that may use pull-based replication, but there are not many. Pub-Sub systems fit the pull model naturally as subscribers consume the data by pulling it from the queue/log/topic. Pull-based replication can be handy in disaster recovery when nodes try to catch up to the current state and must ask for updates/changes starting some cursor or point in history. 

6) Reliability/Safety. As the protocol makes one important change of not rejecting the data from the old-termed source, it is important to look at the safety of this change. The paper claims to model the protocol with TLA+ and model checking it. Intuitively, however, we know that even though the node takes the data from the old source, it actively rejects it by sending its higher term. This, intuitively, should be enough to ensure that the old leader does not reach a quorum of accepts (even though there can be a majority of nodes that copied the data) and does not reply to a client of commitment. The rest is taken care of by Raft’s commit procedure upon the term change.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Near-Optimal Latency Versus Cost Tradeoffs in Geo-Distributed Storage

Short Summary

pando_delegatesYesterday we discussed Pando, a geo-replication system achieving near-optimal latency-cost tradeoff in storage systems. Pando uses large Flexible Paxos deployments and erasure coding to do its magic. Pando relies on having many storage sites to locate sites closer to users. It then uses Flexible Paxos to optimize read and write quorums to have the best latency for a particular workload. Finally, erasure reduces the cost associated with having many storage sites. There are a few caveats to the paper. First is the slowness of the (Flexible) Paxos protocol, as it requires two phases to complete an operation. Pando addresses this issue with a cool use of delegate nodes — one of the nodes from phase-1 quorums acts as a delegate on behalf of the client-proposer to collect phase-1 acks and start phase-2. This can save significant latency, getting the full two-rounds in geo deployments closer to the one round RTT of a regular proposer-driven Paxos. The second problem is the quorum overlaps when using erasure coding. If the data is split into parts, we need to ensure the k-nodes overlap between the read and write quorums, making erasure coding detrimental to achieving low latency. This k-nodes overlap is needed to restore the latest version of the data, as having fewer than k-nodes with the latest version is not enough to reconstruct the data. Pando, however, first tries to read from a smaller quorum that guarantees just one node overlap and hopes that there will be enough nodes that have received the update by the time a read is issued. If there are no k nodes to reconstruct the data, Pando continues waiting for more nodes to respond to get to that large quorum with k-nodes overlap. 

pando-perf

The presentation video:

Discussion

1) ConfigManager. ConfigManager is a component of Pando that computes desirable quorums and delegates for each key and key’s access pattern. We felt like this ConfigManager is very important for the performance of Pando, and it was rather under-specified in the paper. For example, we were wondering if it is possible to make the ConfigManager dynamic and make it learn the workload paters and adjust as needed. 

2) Read consistency. Pando uses quorum reads while using Paxos for writes. Naively reading from a quorum is dangerous and can lead to linearizability problems, yet the reads are not discussed in detail in the paper. The paper briefly in one sentence mentions that to enable quorum reads a global acknowledgment is needed to make sure all nodes know a value has been committed. But Pando does not explain how such commitment works and does not study the performance consequences of this. For example, if a value has been accepted by all nodes, but the commit message is still in flight, does the client read the previous value, or does it retry? If the read quorum has some nodes that have received the commit messages, and some that do not, is it safe to read a new value, or a retry is needed? We do not think it is safe to read, and the paper seems to mention that in passing as well. There are quite a few more questions around the reads and their safety here, so we wished the paper addressed the reads in greater detail. 

3) Rare conflicts. One of the motifs in the paper is that conflicts are rare in scenarios Pando is designed for. This influences a lot of design, as most of the systems are written to work well in the conflict-free case, and somehow address conflict case. For example, when two clients try to write on the same key/document, that will cause a conflict, and Paxos is not very good at handling it due to the dueling-leader problem. Same for the read problem above in (2), as we feel like conflicts will cause the retries, significantly impacting the performance. What is interesting is that despite largely conflict-free evaluation, Pando did better than Fast Paxos in latency, a protocol designed for 1 RTT consensus in conflict-free cases. We feel like the reason is that Pando can get its writes close to 1 RTT with the clever delegates trick, while it can keep the read quorums small to optimize for latency, while Fast Paxos approach is stuck with larger quorums for reads. 

4) Quorum Size. We have spent quite a bit of time discussing quorum sizes. We were confused a bit about the quorum descriptions from the paper, but in the end, we were able to figure out our misunderstanding.

5) Comparison with Atlas. Pando extensively compares with EPaxos. Atlas is a newer, EPaxos-influenced protocol, so we were curious about how the two would compare. 

6) Scalability.  Reducing latency is very important for WAN setups, however, we felt that the paper is not addressing the throughput question. Having too many nodes may have an impact on scalability/throughput. Since Pando uses the clients as proposers, the grunt of dealing with too many storage replicas goes to the clients, and scalability should not be impacted that much. However, the delegates may become the bottlenecks for some workloads.

7) Delegates for throughput. Pando uses delegates to improve latency. We noticed that a somewhat similar idea is used in Compartmentalized Paxos and PigPaxos to improve the throughput instead.

Next Meeting

As always, next week we will continue looking at more great distributed systems papers. We are starting with the OSDI 2020 bunch of papers, and in our next meeting, we will discuss “Fault-tolerant and transactional stateful serverless workflows.” Please join our slack to participate in the discussions and Zoom meetings.

Keep The Data Where You Use It

As trivial as it sounds, but keeping the data close to where it is consumed can drastically improve the performance of the large globe-spanning cloud applications, such as social networks, ecommerce and IoT. These applications rely on some database systems to make sure that all the data can be accessed quickly. The de facto method of keeping the data close to the users is full replication. Many fully replicated systems, however, still have a single region responsible for orchestrating the writes, making the data available locally only for reads and not the updates.

Moreover, full replication becomes rather challenging when strong consistency is desired, since the cost of synchronizing all database replicas on the global scale is large. Many strongly consistent datastores resort to partial replication across a handful of nearby regions to keep the replication latency low. This creates situations when some clients close to the regions in which data is replicated may experience much better access latency than someone reaching out from the other side of the globe.

Data and consumer are collocated.
No access penalty when data and consumer are in the same region

Data and consumer are in different regions
Access penalty (289 ms in this case) when data and consumer are in different regions

Despite the obvious benefits of adapting to locality changes, many databases offer only static partitioning. Of course, some data stores have the migration capability, but still often lack the mechanisms to determine where the data must be moved. Quite a few orthogonal solutions provide capabilities to collocate related data close together or use days or weeks’ worth of logs to compute better data placement offline. Meanwhile, Facebook’s aggressive data migration helps them reduce the access latency by 50% and save on both storage and networking.

We (@AlekseyCharapko, @AAilijiang and @MuratDemirbas) investigated the criteria for quick, live data migration in response to changes on access locality. We posit that effective data-migration policies should:

  • Minimize access latency,
  • Preserve load balancing with regards to data storage and processing capacity
  • Preserve collocation of related data, and
  • Minimize the number of data migrations.

Policies

We developed four simple migration policies aimed at optimizing the data placement. Our policies operate at an arbitrary data-granularity, be it an individual key-value pairs, micro-shards, or the partitions. For simplicity, I say that policies work on objects that represent some data of an arbitrary granularity.

The main point we address with the policies is minimizing access locality, with each policy using a different heuristic to make a data-placement decision. Once the policy finds the most optimal location for an object, it checks the load balancing constraints to adjust the data migration decision as required.

Our simplest policy, the n-consecutive accesses policy, uses a threshold of consecutive accesses to the object to make the placement decision. Although simple, this policy works well for workloads with strong locality in a single region. Majority accesses policy keeps track of some request statistics and uses it to find the region with the most accesses to an object over some time interval. It then migrates the data over to that region.

The exponential moving average (EMA) policy takes a different approach and computes the average region for all requests to the object. The average region is computed as an exponential moving average favoring the most recent requests. This policy can potentially find better placement for objects that have more than one high-access region. However, it requires the regions to have numerical IDs arranged in the order of region’s proximity to each other. This policy falters for deployments with complicated geography and may require multiple migrations to move data to the best location. Another disadvantage of EMA is that it takes longer to settle and requires many data migrations. Unlike other policies that can move the data directly to the desired region, EMA can only migrate objects to one of the neighboring regions, making adjustment such as going from region (1) to (3) include a temporary migration to region (2).

Exponential moving average topology; regions have left and right neighbors
Exponential moving average topology; regions have left and right neighbors

Finally, the center-of-gravity (CoG) policy calculates the optimal object placement by taking into account the distribution of all requests to an object and the distances between the datacenters. CoG policy calculates the region closest to the central location for any access locality workloads. CoG can collect the request statistics similar to the majority accesses policy and make a placement decision only after some time has elapsed since last decision. Alternatively, it can use a continuous metric to assign each region a score corresponding to its weight in the workload, adjust the score and recompute the best object placement with every request.

CoG
Computing CoG Weights (L-scores). Region with lowest score is most central to the current workload’s access distribution. Japan is the object’s owner here, despite Australia having more requests overall. L_jp = 0.4 * 128 + 0.15 * 165 + 0.13 * 318 + 0.08 * 165 = 130.49

Some Evaluation

I’ve simulated protocols under different access locality scenarios and calculated the latency of inter-region access and the number of object movements each policy makes. In the simulations, I used 3000 distinct objects, initially assigned to a random region in the cluster of 15 regions. I used the AWS inter-region latencies to specify the distances between simulated regions.  To my surprise, even the most basic policies showed good improvement over static random placement of data.

In the first experiment, the objects were accessed according to a normal distribution. Each object has a ID assigned to it, and some Normal distribution dictates the probability of the drawing the ID each region. All regions have distributions with the same variance, but different means, making each region predominantly accessing some of the objects, and having some group of objects being more-or-less shared across the regions with adjacent IDs.

Locality Workload. 3000 Objects, 15 regions. Probability of object access is controlled by N(200z,100), where z is region ID in range [0, 15)
Locality Workload. 3000 Objects, 15 regions. Probability of object access is controlled by N(200z,100), where z is region ID in range [0, 15)
In this experiment, both CoG and majority accesses policy showed the best results in terms of latency and the number of object movements. This is because the workload almost always favors a single region, and in rarer cases shares the object between two regions. This makes majority heuristic that only considers one region work well. Similarly, 3-consecutive accesses policy shows good latency, but it generates a lot of jitter constantly moving shared objects between neighboring regions.

When the workload is no longer predominantly single region dominant for every key, single-region heuristic policies perform worse. For instance, equally sharing an object between utmost 3 regions out of 15 causes majority and 3-consecutive accesses policies to lock in to one of the sharing regions instead of optimizing the latency for all sharing regions. CoG policy can place the data in a region optimal for all 3 regions (and not even necessarily in one of the sharing regions) and optimize the latency better than a single-region heuristic, topology unaware policies. EMA policy is at a big disadvantage here, since it relies on ID assignments to dictate the proximity of regions. However, the complex geography of AWS datacenters makes a good ID assignment nearly impossible, causing EMA to sometimes overshoot the best region and settle in less optimal one.

Access is shared equally with up to 3 random regions.
Access is shared equally with up to 3 random regions.

Access locality may fluctuate on a regular basis, and the policy needs to be able to adopt to such changes and adjust the system to keep the latencies low. In this experiment I gradually adjust the normal distribution used in the earlier experiment to make a gradual workload switch. In the figure below, the system ran for enough time to make all objects switch the access locality to the neighboring region. However, the policies adopt well to the change, keeping low latency despite the moving workload. EMA is one notable exception again, as it first gets better latency and the gradually degrades until reaching a steady state (In a separate experiment I observe EMA stabilizing over at around 59 ms of latency)

Changing access locality
Changing access locality

Previous experiments did not consider the effect of load balancing. However, a good data-migration policy should refrain from migrating data to overloaded regions. In the next experiment I applied load-balancing filter to the CoG policy to make the migration procedure first compute the best region for the object, check if that region has the capacity, and if no capacity is available, move the data to the next best region with enough processing/storage capacity. Here I used 5 regions and 1000 objects, and limited each region to storing at most 25% of all data. I ran a heavily skewed workload with 80% of all requests coming from a single region. Under these conditions the CoG policy achieves very low average latency. However, as evidenced by the disbalance graph, all objects migrate over to a single region.  If load balancing is enabled, no region becomes overloaded, but latency improvement becomes more modest.

Balancing
Balancing enabled. Latency on the left, disbalance measured as the difference in object ownership between most and least loaded region

 

 

Concluding Remarks

Having data close to the consumers can dramatically improve the access latency. For some databases, this means doing full replication, for other this may involve moving data or the owner/write role from one region to another. It is important to make sure the data is moved to a right location. I have looked at four simple rules or policies for determining the data migration and ran some simulations on these.

There are a few lessons I have learned so far from this:

  • Topology aware rules/polices work better for a larger variety of situations
  • Simple rules, such as just looking a number of consecutive requests coming from a region or determining the majority accesses region can also work surprisingly well, but not always. These tend to break when access locality is not concentrated in a single region, but shared across a few regions in the cluster
  • EMA looked interesting on paper. It allowed to have just a single number updated with every request to determine the optimal data placement, but it performed rather bad in most experiments. The main reason for this is complicated geography of datacenters.
  • Optimizing for latency and adjusting for load balancing constraints to prevent region overload can be done in two separate steps. My simple two-stage policy (presently) looks at load balancing for each object separately. This becomes a first-come-first-serve system, but I am not sure yet whether this can become a problem.
  • EMA policy takes multiple hops to move data to better region, while n-consecutive accesses policy has constant jitter for objects shared by some regions

I have not studied much about data-collocation in my experiments, nor designed the policies to take this into consideration. One of the reasons is that I think related objects will have similar access locality, causing them to migrate to same datacenters. However, this is just a guess, and I need to investigate this further.

One Page Summary: “PaxosStore: High-availability Storage Made Practical in WeChat”

PaxosStore paper, published in VLDB 2017, describes the large scale, multi-datacenter storage system used in WeChat. As the name may suggest, it uses Paxos to provide storage consistency. The system claims to provide storage for many components of the WeChat application, with 1.5TB of traffic per day and tens of thousands of queries per second during the peak hours.

ps_archPaxosStore relies on Paxos protocol to for consistency and replication within tight geographical regions. The system was designed with a great separation of concerns in mind. At a high level, it has three distinct layers interacting with each other: API layer, consensus layer, and storage.  Separating these out allowed PaxosStore provide most suitable APIs and storage for different tasks and application, while still having the same Paxos-backed consistency and replication.

ps_paxoslogIn a paxos-driven consensus layer,  the system uses a per-object log to keep track of values and paxos-related metadata, such as promise (epoch) and proposal (slot) numbers. Log’s implementation, however, seems to be somewhat decoupled from the core Paxos protocol. Paxos implementation is leaderless, meaning there are no single dedicated leader for each object, and every node can perform writes on any of the objects in the cluster by running prepare and accept phases of Paxos. Naturally, the system tries to perform (most) writes in one round trip instead of two by assuming some write locality. After the first successful write, a node can issue more writes with increasing proposal (slot) numbers. If some other node performs a write, it needs to have higher ballot, preventing the old master from doing quick writes. This is a rather common approach, used in many Paxos variants.

The lack of a single stable leader complicates the reads, since there is no authority that has the most up-to-date state of each object. One solution for reading is to use Paxos protocol directly, however this disrupts locality of write operations by hijacking the ballot to perform a read. Instead, PaxosStore resorts to reading directly from replicas by finding the most recent version of the data and making sure a majority of replicas have that version. This works well for read-heavy workloads, but in some high-contention (or failure?) cases the most-recent version may not have a majority of replicas, and the system falls-back to running Paxos for reading.

ps_miniclusterPaxosStore runs in multiple datacenters, but it is not a full-fledged geo-replicated system, as it only replicates between the datacenters located in the same geographical area. The paper is not clear on how data get assigned to regions and whether objects can migrate between regions in any way. Within each datacenter the system organizes nodes into mini-clusters, with each mini-cluster acting as a Paxos follower. When data is replicated between mini-clusters, only one (some?) nodes in each mini-cluster hold the data. This design aims to improve fault tolerance: with a 2-node mini-cluster, failure of 1 node does not result in the failure of the entire mini-cluster Paxos-follower.

ps_latThe paper somewhat lacks in its evaluation, but PaxosStore seems to handle its goal of multi-datacenter, same-region replication fairly well, achieving sub-10 ms writes.

This paper seems like a good solution for reliable and somewhat localized data-store. The authors do not address data sharding and migration between regions and focus only on the intra-region replication to multiple datacenters, which makes me thing PaxosStore is not really “global”, geo-replicated database.  The fault tolerance is backed by Paxos, mini-clusters and the usage of PaxosLog for data recovery. The evaluation could have been more complete if authors showed scalability limits of their system and provided some details about throughput and datacenter-locality of the workload in the latency experiments.

Here is one page pdf of this summary.