Tag Archives: database

Reading Group. Exploiting Symbolic Execution to Accelerate Deterministic Databases

We have covered 60 papers in our reading group so far! The 60th paper we explored was “Exploiting Symbolic Execution to Accelerate Deterministic Databases” from ICDCS’20. I enjoyed the paper quite a lot, even though there are some claims I do not necessarily agree with.

The paper solves the problem of executing transactions in deterministic databases. We can image a replicated state machine, backed by Paxos or Raft. Trivially, in such a machine, each replica node needs to execute the transactions following the exact order prescribed by the leader to guarantee that all replicas progress through the same states of the machine. The good thing here is that nodes run each transaction independently of each other after the execution order has been established by the replication leader. The bad thing is that this naive approach is sequential, so each node cannot take advantage of multiple processing cores it may have. 

Naturally, we want to parallelize the transaction execution. This, however, is easier said than done. To allow for more parallelism, we want to identify the situations when it is ok to run some transactions concurrently without impacting the final state of the state machine. For example, if we know what objects or keys the transaction reads and writes (i.e. the transaction’s read-and-write set), we can group independent transactions that operate on disjoint read-write sets together for parallel execution. For instance, a transaction accessing keys “x, y, and z” is independent of a transaction accessing keys “a, b, and c,” and the two can execute at the same time without impacting each other. 

Of course, this requires us to know what objects/keys each transaction needs before running them, and this is a bit of a problem. In some situations, it may be easy to figure out the read and write sets of a transaction, but this is not always the case. Many systems, like Calvin, do a “pretend run” of a transaction (this is sometimes referred to as a reconnaissance transaction) to figure out the read-and-write set if the set isn’t obvious or annotated in the transaction. This has a few obvious downsides. Obviously, the pretend/reconnaissance phase uses the system’s resources. The reconnaissance run also increases the transaction’s latency. And finally, the reconnaissance is not perfect, and by the time of the “real” run, the read and write sets may have changed due to other transactions impacting the state of the system.  

So, the above description is somewhat generic behavior for many systems out there. And this is where Prognosticator, a system discussed in the paper, comes in. Prognosticator uses Symbolic Execution (SE) to profile transactions and help predict each transaction’s read-and-write set. The system does not need experts to annotate transactions read-and-write sets, but it can still avoid the reconnaissance runs in many situations. Sometimes, however, the reconnaissance must still happen, but Prognosticator uses a few tricks to reduce the possibility of the reconnaissance becoming stale.

Let’s look at the issues with figuring out the read-write set of a transaction. Many transactions are not primitive read and write commands, and involve quite a bit of logic with loops and conditional statements. This means that a state of a client/application may impact both the values written and the keys accessed. For example, consider a transaction that takes some input i:

input i;
if i > 10 then write x:=i;
          else write y:=i;

The read-and-write set of above example depends on the input value already known to the client. The paper calls this transaction an Independent Transaction (IT), as it does not have an internal dependence on the read values. 

Some transactions can be a bit more complicated and have the read-write set depending on the value read by the transaction:

read a;
if a > 10 then write x:=a;
          else write y:=a;

Here the transaction does not know its write set (i.e. writing x or y) until it acquires the value of a. Prognosticator paper refers to these transactions as Dependent Transactions (DT), as the write set has an internal dependence on the read values. 

Obviously, for both types of transactions, we can do the reconnaissance phase to figure out all the logic and branching to learn all the required keys. But we do not really need full reconnaissance for the ITs, as their read-and-write sets only depend on some client input and not the transaction itself. In fact, we can just play out the transaction’s code to figure out the read-write set without actually retrieving any data from the store (i.e. using some dummy values). However, we still somehow need to know whether a transaction is IT, as using dummy values in DT will clearly not work. Moreover, such a “dry run” with dummy values for every IT we encounter is still wasteful, as we do it every time.

Example of Symbolic Execution (SE) – symbolic solution α, path constraint φ.

This is where Prognosticator’s Symbolic Execution (SE) approach shines. With SE, Prognosticator “unwraps” each transaction for all the possible execution branches, leading to a symbolic transaction solution for all possible code paths. If all code paths access the same keys then we have a static read-and-write set. If certain execution branches access different keys, but branching conditions involve only transaction input, then we are dealing with an Independent Transaction (IT). We can easily compute IT’s read-and-write set from the symbolic solution once the input is known. Finally, if SE yields some branches with different access keys, and these branches are conditioned on a transaction’s reads, then we have a Dependent Transaction (DT) and will require a reconnaissance read. 

The Prognosticator “unwraps” each new transaction type only once at the client-side to create such a symbolic execution profile with all possible code branches. There are quite a few optimizations mentioned in the paper. The important gist of these optimizations is the fact that we do not care so much about the actual symbolic solutions, and care only about what keys show up in the read set and write sets. So if two execution branches produce different symbolic solutions, but access the same keys, these branches can be “merged” for the purposes of predicting the read-and-write set. 

The rest of the Prognosticator’s magic depends on a batching technique that allows for a careful deterministic reordering of transactions. With the help of SE, the system identifies all read-only transactions (ROTs) and executes them concurrently at the beginning of the batch. This leaves us with a batch containing only ITs and DTs. The system then reorders DTs to the beginning of the batch. This allows it to run reconnaissance reads on all DTs while also working on ROTs. Since all DTs are now at the beginning of the batch, the reconnaissance reads cannot become stale due to any IT. Reconnaissance may still become stale due to the dependencies between DTs, and in this case, a DT is aborted during the execution phase and is placed in the abort batch to run after the main batch completes and before the next batch.

Lock table with per-key queues on the right. ROTs are not in the table, DTs and ITs are ordered in the table. Only transactions with all keys at the head of the queues can execute.

To execute the ITs and DTs in parallel, Prognosticator uses a lock table. The lock table is a collection of per-key queues, such that for every key in the batch there is an entry in the table with a queue of transactions. A transaction can be executed when it is at the head of the queues for all its keys. Obviously, executing a transaction removes it from all these queues.

The whole transaction execution process runs independently on each node. It is safe because we actually do not need to stick to the leader-prescribed ordered in the batch, as long as we keep the correct order of batches, and deterministically reorder the transactions in the same way on all replica nodes. With the batch execution, we have all the clients waiting for their transactions to finish, and this deterministic reordering does not cause any problems as all waiting transactions are concurrent. This is a common trick used in many systems. 

The whole package with SE and batching provides a significant boost to the throughput compared to Calvin. It is not entirely clear though how much of the boost was enabled by the SE, but I will come back to this point in the discussion summary. 

The paper goes into more detail on many aspects of the paper, including symbolic execution, more efficient implementation of the lock table, resource usage, etc. It was definitely a good read for me. As always, we had a presentation in our reading group, and I had to cover for a missing presenter:

Discussion

1) Performance gains due to SE vs Batching. The paper claims a great speedup compared to Calvin, however, one question we had is just how much improvement is due to symbolic execution and how much of it is because of clever batching techniques. Let me elaborate. Some benefit comes from the careful reordering of operations. Running ROTs first helps a lot. Running reconnaissance reads at the node (compared to running a reconnaissance transaction at the client) is a lot faster too, and it reduces the possibility of reconnaissance becoming stale. Some of these techniques may be applicable in simpler systems without SE. For example, if we have transactions with annotated read-write sets, these reorderings within the batch become possible. Of course, SE definitely helps find ROTs and separate ITs from DTs to improve/enable the reordering within the batch without the need for annotated transactions. 

2) Slow ROTs. Deterministic transactions are susceptible to clogged pipelines when some big long-running transaction gets in the way and delays consecutive transactions. Parallel/concurrent execution helps here by essentially having multiple processing pipelines. However, Prognosticator has one issue with the read-only transactions (ROTs), making them more susceptible to the clogging problem. The paper mentions that ROTs get executed at the beginning of the batch from a stable snapshot. All workers must complete their ROTs before the system moves to DTs and ITs to make sure that no DT or IT changes that stable snapshot while ROTs are still running. This means that there is a barrier at the end of the ROT phase, allowing a single long-running ROT to screw up the performance by delaying all DTs and ITs in the batch. However, this may be just an artifact of an academic prototype — taking a separate snapshot and running all ROTs from that snapshots should allow ROTS to execute concurrently with writes. 

3) Cost of SE. The paper mentions the cost of doing symbolic execution. There are a few problems here. The first is processing time – more complicated transactions need a lot of time to pretty much exhaustively explore all branching. This also requires putting a limit on the number of allowed loop iterations. The limit can create a situation where a transaction is not fully profiled if it actually goes above the limit, requiring a reconnaissance. The limit, being a configurable parameter, may also require some expert knowledge of the workload to properly configure, and this is something the paper strives to avoid. Another big cost is the memory footprint of transaction profiles. The authors mention that the TPC-C benchmark required 960 MB for transaction profiles, which is not a small cost for a simple benchmark with relatively few transaction types. In the real world, the memory cost of having transaction profiles may be much higher.

4) Extension to sharded systems? Prognosticator works in the replicated system with all nodes storing identical data. It does not work in a sharded environment, yet most large-scale databases are sharded. It may be non-trivial to apply the same approach to sharded systems. After all, running a distributed transaction is harder, and may require some coordination between the shards. At the same time, it may still be possible to separate transactions according to their types and conflict domains with the help of SE to increase parallelism and make transaction execution more independent. Again, real systems based on Calvin’s ideas have cross-shard transactions. A big problem with sharded setup in Prognosticator involves DTs — the system expects to perform the reconnaissance read locally, which means that all data for the transaction must be available at each node. This is not possible in the sharded environment. And making reads non-local will make the system much closer to Calvin with a longer distributed reconnaissance phase and negative performance impact. So, the non-sharded nature of Prognosticator is a huge performance benefit when comparing with more general Calvin.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. chainifyDB: How to get rid of your Blockchain and use your DBMS instead

Our recent meeting focused on Blockchains, as we discussed “chainifyDB: How to get rid of your Blockchain and use your DBMS instead” CIDR’21 paper. The presentation by Karolis Petrauskas is available here:

The paper argues for using existing and proven technology to implement a permissioned blockchain-like system. The core idea is to leverage relational SQL-99 compliant DBMSes that are very common in various IT architectures. The participants in the blockchain can then leverage the existing software stack to implement secure data-sharing or replication. Moreover, the authors argue that in practice different participants may run DBMSes of different versions or even different vendors, making the problem of syncing data between participants more challenging, as different versions/vendors may execute the same SQL transactions slightly differently. To address the heterogeneity issues, the paper proposes a “Whatever-Voting” model, where the system achieves consensus on the results of a transaction, and not on the transaction itself. This allows each node/organization to run the transaction in “whatever” way possible, as long as the final result of the transaction is agreed upon by the quorum of participants in the “voting” phase. If the voting phase does not pass, the node with a mismatched result must roll the transaction back (potentially more than one) and try to recover by reapplying it. To me this seems a bit naive — if a node could not produce the proper results to a correct, non-malicious transaction due to its vendor differences or “whatever” execution differences, it may get stuck in the infinite loop of rollbacks and reapplies until whatever phase is fixed to produce the correct results for a specific transaction. This can be a very error-prone, engineering-intensive process of constant fine-tuning and micromanaging the components.

Step 1: Client creates a Proposal from SQL transaction. Step 2: Early abort of bad proposals. Step 3: Proposal is sent for ordering after all participants verify that it can be executed. Step 4: Ordered produces a block from multiple transactions/proposals. Step 5: Execution against local DBMSes. Step 6: Commit phase after the block is written to DBMS. Step 7: Voting to check the results are identical in the quorum of nodes/organizations. Step 8: Ledger entry is generated if the agreement in Step 7 is successful.

Since the proposed solution does not reach an agreement on transactions, the paper argues that it does not need complicated protocols to ensure that all correct nodes have correct transactions. If some entity issues different transactions to different participants, the results won’t match, missing the quorum agreement and causing a system-wide rollback and retry. However, the voting scheme should still be resilient to some cheating — a node must provide some proof or digest of the authenticity of its results.

The authors argue that their system does not have a centralized authority. However, it still has a centralized control component that batches the transactions and orders them. This component can be untrusted since the participants should catch mismatched batches and roll the state back as needed. That being said, a byzantine centralized batcher/sequencer (the paper calls it orderer) can cause liveness issues by sending garbage to participants causing constant recoveries. The paper does not specify the process of electing/replacing the orderer, so I think in practice it should be trusted for liveness purposes. 

There is a lot more work involved to make the scheme work. For example, each node must remember the transaction batches to create the ledger. This also happens within the DBMS, and some precautions must be taken to ensure that the ledger is tamper-proof. The roll-back procedure is also complicated and must be carefully implemented to have good performance. 

The authors conduct their evaluation on a limited scale with just 3 nodes/DBMS instances and show good scalability compared to Fabric and Fabric++. The evaluation focuses on increasing the number of clients interacting with the DBMSes, and not the number of DBMS instances. The paper does not mention the possibility to have Byzantine clients. Moreover, the evaluation does not consider any byzantine cases. 

Discussion

1) Paper Quality. Our group’s collective opinion about the quality of the paper was not very enthusiastic. The paper’s premise is interesting and practical, however, it has way too many holes, missed promises, and unsubstantiated claims. These range from inadequate evaluation to relying on centralized unreplaceable “orderer” in a decentralized system to an underspecified voting protocol to glaring engineering holes in the “whatever” model that is prone to liveness issues due to bad implementations. There are way too many to list them all, so I will mention just one in a bit of detail. 

A big undelivered promise is using the DBMSes as “black-box.” While the DBMS software does not need any modifications, the applications running on these DBMSes must be adjusted to accommodate the WV model. Although these adjustments may be relatively small (i.e. making the client talk to chainify server instead of DBMS), it appears that the “whatever” stage may need tweaks on a per transaction-type basis if the DBMSes are very different. This additional logic and tweaking is hardly a black-box approach from the application point of view. Other concerns here involve privacy. What if some tables at one organization are not supposed to be shared but transactions/queries from this same entity touch both shared and private tables? Addressing these may require even bigger application changes and rewrites. And of course, this is not to mention all the additional table chainifying introduces to the database.

2) Preventing Byzantine Behavior vs. Detecting Byzantine Behavior. An interesting point the paper raises (and I wish it was explored more) is whether we need to prevent byzantine actions from taking place or just being able to detect them. Unlike all (most?) of the blockchains, chainifyDB does not prevent bad behavior from executing, and instead, it tries to detect misbehavior and fix it by a reset and restore procedure. Is this approach cheaper on the fundamental level? What applications can it support? Obviously, chainifyDB argues that it supports the permissioned blockchains, but We’d love to see more rigor and research in that direction.

There are some potential issues with this approach, and the paper does not give many details on these. For example, a transaction may execute on all nodes and produce matching result digests for voting. However, if the transaction itself is malicious, and for instance, tries to create value or double-spend something, then the correct nodes must have a mechanism to abort such transaction before its execution (or detect and reset after?). Result voting may not sufficient here. Of course, it is possible to implement some pre-execution sanity check logic to verify that the transaction is compliant with all the rules and constraints. It appears that in chainifyDB this stage happens early and requires every participant’s vote (see Step 2 in the figure in the summary), but the paper is not very clear on the safety implication of this vote/abort decision collection. Also, what about fault tolerance if we need all nodes for this? Additionally, this, in a sense, creates some pre-execution voting that the paper claims to avoid.

3) Issues of Trust. This is a bigger discussion on different types of storage systems and applications that need them. The most popular type of data systems is the ones with trust in a single participant/entity. This is not a problem for an in-house database for example. Many cloud data-intensive services rely on this model, where clients would trust the cloud provider and do not worry about the provider becoming malicious. This even works if the clients distrust each other, as long as they all trust the service provider. The latter is often sold as some form of a permissioned blockchain, but fundamentally it is just a database with an append-only log where safety is ultimately ensured by the trusted provider. Truly permissioned blockchains have no single trusted entity, however, a relatively small set of participants can be trusted as a group. The permissioned nature prevents Sybil attacks by spoofing many fake malicious participants to overtake the network, allowing to have small groups to be trustworthy despite having malicious individuals. Finally, open-blackchains are completely untrusted environments, where only the entire collective of all participants can be trusted. The figure here illustrates this. 

One discussion question we had is when to use permissioned blockchain and why not to stick with open-access protocols instead. Of course, permissioned blockchains or BFT protocols are cheaper to run than open-access counterparts and often have better latency and higher throughput. But is it safe to have trust in a small group of participants? What applications are ok with that level of trust? 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Special Session: Distributed Transactions in YugabyteDB

When: May 11th at 12:00 pm EST

Who: Karthik Ranganathan.

Karthik Ranganathan is a founder and CTO of YugabyteDB, a globally distributed, strongly consistent database. Prior to Yugabyte, Karthik was at Facebook, where he built the Cassandra database. In this talk, Karthik will discuss Yugabyte’s use of time synchronization and Raft protocol along with some optimizations that enable high-performance distributed transactions.

Abstract

ACID transactions are a fundamental building block when developing business-critical, user-facing applications. They simplify the complex task of ensuring data integrity while supporting highly concurrent operations. While they are taken for granted in monolithic SQL databases, most distributed DBs would forsake them completely.

Fortunately, this is no longer the case. The trend started with Google Spanner, which offered distributed transactions using GPS based atomic clocks – unheard of in the database world before. Now, distributed transactions – without requiring atomic clocks – are offered by distributed SQL databases. One such example of a fully open source database offering this is YugabyteDB. Using the example of YugabyteDB, this talk will explain how distributed ACID transactions can be achieved without atomic clocks – without compromising on performance.

Keep The Data Where You Use It

As trivial as it sounds, but keeping the data close to where it is consumed can drastically improve the performance of the large globe-spanning cloud applications, such as social networks, ecommerce and IoT. These applications rely on some database systems to make sure that all the data can be accessed quickly. The de facto method of keeping the data close to the users is full replication. Many fully replicated systems, however, still have a single region responsible for orchestrating the writes, making the data available locally only for reads and not the updates.

Moreover, full replication becomes rather challenging when strong consistency is desired, since the cost of synchronizing all database replicas on the global scale is large. Many strongly consistent datastores resort to partial replication across a handful of nearby regions to keep the replication latency low. This creates situations when some clients close to the regions in which data is replicated may experience much better access latency than someone reaching out from the other side of the globe.

Data and consumer are collocated.
No access penalty when data and consumer are in the same region

Data and consumer are in different regions
Access penalty (289 ms in this case) when data and consumer are in different regions

Despite the obvious benefits of adapting to locality changes, many databases offer only static partitioning. Of course, some data stores have the migration capability, but still often lack the mechanisms to determine where the data must be moved. Quite a few orthogonal solutions provide capabilities to collocate related data close together or use days or weeks’ worth of logs to compute better data placement offline. Meanwhile, Facebook’s aggressive data migration helps them reduce the access latency by 50% and save on both storage and networking.

We (@AlekseyCharapko, @AAilijiang and @MuratDemirbas) investigated the criteria for quick, live data migration in response to changes on access locality. We posit that effective data-migration policies should:

  • Minimize access latency,
  • Preserve load balancing with regards to data storage and processing capacity
  • Preserve collocation of related data, and
  • Minimize the number of data migrations.

Policies

We developed four simple migration policies aimed at optimizing the data placement. Our policies operate at an arbitrary data-granularity, be it an individual key-value pairs, micro-shards, or the partitions. For simplicity, I say that policies work on objects that represent some data of an arbitrary granularity.

The main point we address with the policies is minimizing access locality, with each policy using a different heuristic to make a data-placement decision. Once the policy finds the most optimal location for an object, it checks the load balancing constraints to adjust the data migration decision as required.

Our simplest policy, the n-consecutive accesses policy, uses a threshold of consecutive accesses to the object to make the placement decision. Although simple, this policy works well for workloads with strong locality in a single region. Majority accesses policy keeps track of some request statistics and uses it to find the region with the most accesses to an object over some time interval. It then migrates the data over to that region.

The exponential moving average (EMA) policy takes a different approach and computes the average region for all requests to the object. The average region is computed as an exponential moving average favoring the most recent requests. This policy can potentially find better placement for objects that have more than one high-access region. However, it requires the regions to have numerical IDs arranged in the order of region’s proximity to each other. This policy falters for deployments with complicated geography and may require multiple migrations to move data to the best location. Another disadvantage of EMA is that it takes longer to settle and requires many data migrations. Unlike other policies that can move the data directly to the desired region, EMA can only migrate objects to one of the neighboring regions, making adjustment such as going from region (1) to (3) include a temporary migration to region (2).

Exponential moving average topology; regions have left and right neighbors
Exponential moving average topology; regions have left and right neighbors

Finally, the center-of-gravity (CoG) policy calculates the optimal object placement by taking into account the distribution of all requests to an object and the distances between the datacenters. CoG policy calculates the region closest to the central location for any access locality workloads. CoG can collect the request statistics similar to the majority accesses policy and make a placement decision only after some time has elapsed since last decision. Alternatively, it can use a continuous metric to assign each region a score corresponding to its weight in the workload, adjust the score and recompute the best object placement with every request.

CoG
Computing CoG Weights (L-scores). Region with lowest score is most central to the current workload’s access distribution. Japan is the object’s owner here, despite Australia having more requests overall. L_jp = 0.4 * 128 + 0.15 * 165 + 0.13 * 318 + 0.08 * 165 = 130.49

Some Evaluation

I’ve simulated protocols under different access locality scenarios and calculated the latency of inter-region access and the number of object movements each policy makes. In the simulations, I used 3000 distinct objects, initially assigned to a random region in the cluster of 15 regions. I used the AWS inter-region latencies to specify the distances between simulated regions.  To my surprise, even the most basic policies showed good improvement over static random placement of data.

In the first experiment, the objects were accessed according to a normal distribution. Each object has a ID assigned to it, and some Normal distribution dictates the probability of the drawing the ID each region. All regions have distributions with the same variance, but different means, making each region predominantly accessing some of the objects, and having some group of objects being more-or-less shared across the regions with adjacent IDs.

Locality Workload. 3000 Objects, 15 regions. Probability of object access is controlled by N(200z,100), where z is region ID in range [0, 15)
Locality Workload. 3000 Objects, 15 regions. Probability of object access is controlled by N(200z,100), where z is region ID in range [0, 15)
In this experiment, both CoG and majority accesses policy showed the best results in terms of latency and the number of object movements. This is because the workload almost always favors a single region, and in rarer cases shares the object between two regions. This makes majority heuristic that only considers one region work well. Similarly, 3-consecutive accesses policy shows good latency, but it generates a lot of jitter constantly moving shared objects between neighboring regions.

When the workload is no longer predominantly single region dominant for every key, single-region heuristic policies perform worse. For instance, equally sharing an object between utmost 3 regions out of 15 causes majority and 3-consecutive accesses policies to lock in to one of the sharing regions instead of optimizing the latency for all sharing regions. CoG policy can place the data in a region optimal for all 3 regions (and not even necessarily in one of the sharing regions) and optimize the latency better than a single-region heuristic, topology unaware policies. EMA policy is at a big disadvantage here, since it relies on ID assignments to dictate the proximity of regions. However, the complex geography of AWS datacenters makes a good ID assignment nearly impossible, causing EMA to sometimes overshoot the best region and settle in less optimal one.

Access is shared equally with up to 3 random regions.
Access is shared equally with up to 3 random regions.

Access locality may fluctuate on a regular basis, and the policy needs to be able to adopt to such changes and adjust the system to keep the latencies low. In this experiment I gradually adjust the normal distribution used in the earlier experiment to make a gradual workload switch. In the figure below, the system ran for enough time to make all objects switch the access locality to the neighboring region. However, the policies adopt well to the change, keeping low latency despite the moving workload. EMA is one notable exception again, as it first gets better latency and the gradually degrades until reaching a steady state (In a separate experiment I observe EMA stabilizing over at around 59 ms of latency)

Changing access locality
Changing access locality

Previous experiments did not consider the effect of load balancing. However, a good data-migration policy should refrain from migrating data to overloaded regions. In the next experiment I applied load-balancing filter to the CoG policy to make the migration procedure first compute the best region for the object, check if that region has the capacity, and if no capacity is available, move the data to the next best region with enough processing/storage capacity. Here I used 5 regions and 1000 objects, and limited each region to storing at most 25% of all data. I ran a heavily skewed workload with 80% of all requests coming from a single region. Under these conditions the CoG policy achieves very low average latency. However, as evidenced by the disbalance graph, all objects migrate over to a single region.  If load balancing is enabled, no region becomes overloaded, but latency improvement becomes more modest.

Balancing
Balancing enabled. Latency on the left, disbalance measured as the difference in object ownership between most and least loaded region

 

 

Concluding Remarks

Having data close to the consumers can dramatically improve the access latency. For some databases, this means doing full replication, for other this may involve moving data or the owner/write role from one region to another. It is important to make sure the data is moved to a right location. I have looked at four simple rules or policies for determining the data migration and ran some simulations on these.

There are a few lessons I have learned so far from this:

  • Topology aware rules/polices work better for a larger variety of situations
  • Simple rules, such as just looking a number of consecutive requests coming from a region or determining the majority accesses region can also work surprisingly well, but not always. These tend to break when access locality is not concentrated in a single region, but shared across a few regions in the cluster
  • EMA looked interesting on paper. It allowed to have just a single number updated with every request to determine the optimal data placement, but it performed rather bad in most experiments. The main reason for this is complicated geography of datacenters.
  • Optimizing for latency and adjusting for load balancing constraints to prevent region overload can be done in two separate steps. My simple two-stage policy (presently) looks at load balancing for each object separately. This becomes a first-come-first-serve system, but I am not sure yet whether this can become a problem.
  • EMA policy takes multiple hops to move data to better region, while n-consecutive accesses policy has constant jitter for objects shared by some regions

I have not studied much about data-collocation in my experiments, nor designed the policies to take this into consideration. One of the reasons is that I think related objects will have similar access locality, causing them to migrate to same datacenters. However, this is just a guess, and I need to investigate this further.

One Page Summary: “PaxosStore: High-availability Storage Made Practical in WeChat”

PaxosStore paper, published in VLDB 2017, describes the large scale, multi-datacenter storage system used in WeChat. As the name may suggest, it uses Paxos to provide storage consistency. The system claims to provide storage for many components of the WeChat application, with 1.5TB of traffic per day and tens of thousands of queries per second during the peak hours.

ps_archPaxosStore relies on Paxos protocol to for consistency and replication within tight geographical regions. The system was designed with a great separation of concerns in mind. At a high level, it has three distinct layers interacting with each other: API layer, consensus layer, and storage.  Separating these out allowed PaxosStore provide most suitable APIs and storage for different tasks and application, while still having the same Paxos-backed consistency and replication.

ps_paxoslogIn a paxos-driven consensus layer,  the system uses a per-object log to keep track of values and paxos-related metadata, such as promise (epoch) and proposal (slot) numbers. Log’s implementation, however, seems to be somewhat decoupled from the core Paxos protocol. Paxos implementation is leaderless, meaning there are no single dedicated leader for each object, and every node can perform writes on any of the objects in the cluster by running prepare and accept phases of Paxos. Naturally, the system tries to perform (most) writes in one round trip instead of two by assuming some write locality. After the first successful write, a node can issue more writes with increasing proposal (slot) numbers. If some other node performs a write, it needs to have higher ballot, preventing the old master from doing quick writes. This is a rather common approach, used in many Paxos variants.

The lack of a single stable leader complicates the reads, since there is no authority that has the most up-to-date state of each object. One solution for reading is to use Paxos protocol directly, however this disrupts locality of write operations by hijacking the ballot to perform a read. Instead, PaxosStore resorts to reading directly from replicas by finding the most recent version of the data and making sure a majority of replicas have that version. This works well for read-heavy workloads, but in some high-contention (or failure?) cases the most-recent version may not have a majority of replicas, and the system falls-back to running Paxos for reading.

ps_miniclusterPaxosStore runs in multiple datacenters, but it is not a full-fledged geo-replicated system, as it only replicates between the datacenters located in the same geographical area. The paper is not clear on how data get assigned to regions and whether objects can migrate between regions in any way. Within each datacenter the system organizes nodes into mini-clusters, with each mini-cluster acting as a Paxos follower. When data is replicated between mini-clusters, only one (some?) nodes in each mini-cluster hold the data. This design aims to improve fault tolerance: with a 2-node mini-cluster, failure of 1 node does not result in the failure of the entire mini-cluster Paxos-follower.

ps_latThe paper somewhat lacks in its evaluation, but PaxosStore seems to handle its goal of multi-datacenter, same-region replication fairly well, achieving sub-10 ms writes.

This paper seems like a good solution for reliable and somewhat localized data-store. The authors do not address data sharding and migration between regions and focus only on the intra-region replication to multiple datacenters, which makes me thing PaxosStore is not really “global”, geo-replicated database.  The fault tolerance is backed by Paxos, mini-clusters and the usage of PaxosLog for data recovery. The evaluation could have been more complete if authors showed scalability limits of their system and provided some details about throughput and datacenter-locality of the workload in the latency experiments.

Here is one page pdf of this summary.