Tag Archives: database

Reading Group Paper. Take Out the TraChe: Maximizing (Tra)nsactional Ca(che) Hit Rate

In this week’s reading group, we discussed the “Take Out the TraChe: Maximizing (Tra)nsactional Ca(che) Hit Rate” OSDI’23 paper by Audrey Cheng, David Chu, Terrance Li, Jason Chan, Natacha Crooks, Joseph M. Hellerstein, Ion Stoica, Xiangyao Yu. This paper argues against optimizing for object hit rate in caches for transactional databases. The main logic behind this is that missing even a single object needed for a transaction will require a trip to the database and incur the associated costs. Instead, the paper suggests designing caches to leverage the structure/composition of transactions. 

Let me use an example from the paper to explain this better. Let’s say we have a transaction, shown in the image here, that queries 4 different tables: A, B, C, and D, such that querying tables B and C depends on the results of reading table A, and reading D requires to first get the data from C. This read-only transaction has data dependencies, and the authors exploit such dependencies to improve transaction latency. See, this example has two dependency chains (A -> B and A -> C -> D). The A -> C -> D is the longest chain with three queries in a sequence. A transaction-optimized cache can then focus on reducing the number of such chained operations that need to hit the database. For instance, caching table A reduces the number of queries answered by the database from 3 down to 2 (using cache for A, then reading B and C concurrently, and then reading table D). Similarly, caching just table C also reduces the number of database steps in the longest chain (reading A, using cached C, and reading B and D concurrently). Every time we reduce the number of database steps in the transaction chain with the most such steps, we improve transaction latency further by cutting out more database interactions and associated networking costs. As such, caching both A and C, for example, can be even better than just A or C, as now there is only one database step left in both transaction chains. A further improvement now needs to consider caching tables that can help cut database access from both chains. 

DeToX, the system proposed in the paper, tries to achieve the above strategy. It works by knowing what types of transactions an application may run and using it to enumerate all possible groups of tables to cache for each transaction type. These groups are then scored during the runtime to decide which groups of tables are more valuable to the cache. In short, the idea is to give higher scores to groups with smaller yet frequently used tables. Finally, caching entire tables may be infeasible, so DeToX also scores individual keys/objects to decide which ones to cache. Similarly, the idea here is to keep as many high-impact keys/objects in the cache as possible, where impact is measured by the object’s “hotness” and whether the object benefits from important/frequent transactions or lots of types of transactions. The paper has a more precise description and formulas used for scoring than my super high-level summary. 

DeToX runs as a shim layer between the PostgreSQL and the clients. This shim layer sits on a separate (and equally large!) machine as the database itself. In the eval, both ran on AWS c5a.4xlarge (16vCPU, 32GB RAM) VMs. Clients do not interact with the database directly and use the shim layer instead. The shim keeps the cache coherence with the underlying PostgreSQL with two-phase locking. The actual cache is backed by Redis, running on an even larger VM (in the eval, it was c5a.16xlarge with 64 vCPUs and 128 GB RAM).

Anyway, this approach and setup seem to provide a decent improvement in the transaction hit rate over other state-of-the-art caching strategies. The paper defines a transaction hit rate as using a cache to successfully reduce the number of databases accessed in the transaction’s longest dependency chain. The object hit rate, however, is reduced since this is not a priority for the scoring system.


We had a long discussion of the paper, and for the sake of space and my time, I will summarize only a handful of points. 

1) Object hit rate vs. Transaction hit rate. The objective of the paper is to minimize the transaction hit rate (i.e., caching at least some of the transaction’s sequential steps in their entirety to remove these steps from ever touching the database). This seems to help improve the latency. However, a lower object hit rate may result in databases having to do more work, as now the database needs to serve more objects. It may be the case that for use cases that require higher throughput, object hit rate may still be more important. For what is worth, the paper reports throughput improvements despite the lower object hit rate. 

2) Use cases for DeTox. Stemming from the point above, the use case we see for DeToX is latency-driven. Some caches only exist for reducing latency, and regardless of cache hit or miss, they exercise the underlying storage (see DynamoDB ATC’22 paper) for reliability reasons. It seems like a DeToX may be a viable solution in this type of cache usage for transactional workloads. 

3) Latency Improvements. The DeToX caching approach is supposed to improve latency by cutting out entire transactional steps from reaching the database. The system prefers to cache objects from smaller yet frequently utilized tables. These smaller tables, due to their size, may also be the most efficient to answer using the databases and not the cache. As such, the latency improvements may not be proportional to the number of transactional steps cached and “cut out” if the remaining steps require more complex queries over larger tables that just take longer to be served by the database. 

4) Cost of scoring. For large enough applications with many transaction types, there can be a substantial number of groups that need to be scored during the runtime as system access patterns start to emerge. This process can be quite costly (and the paper admits that), so we were wondering whether there are ways to mitigate some of that cost. The paper already proposes a few improvements. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Amazon DynamoDB: A Scalable, Predictably Performant, and Fully Managed NoSQL Database Service

In the 120th DistSys meeting, we talked about “Amazon DynamoDB: A Scalable, Predictably Performant, and Fully Managed NoSQL Database Service” ATC’22 paper by Mostafa Elhemali, Niall Gallagher, Nicholas Gordon, Joseph Idziorek, Richard Krog, Colin Lazier, Erben Mo, Akhilesh Mritunjai, Somu Perianayagam, Tim Rath, Swami Sivasubramanian, James Christopher Sorenson III, Sroaj Sosothikul, Doug Terry, Akshat Vig.

The paper is loaded with content as it presents many different things, spanning ten years of development. None of the topics are covered in great detail, but I think it is still a great overview of such a massive project. Obviously, the authors discuss DynamoDB, its architecture, and its design. The paper also provides a brief history of the system and examines several challenges/lessons the team has learned while operating such a massive scale system.

To start with the architecture, the users interact with the system by reaching out to the request router. The router can perform the authentication and admission control. Most importantly, however, the router has access to partition metadata, allowing it to, well, route the requests to proper storage nodes and replicas. A node hosts multiple replicas for different partitions.

So, speaking of partitions, each data item in DynamoDB has a unique primary key. These primary keys group items into partitions replicated with Multi-Paxos for redundancy across multiple replicas in different availability zones. The assignment of key ranges to partitions (and partitions to nodes?) constitute the metadata needed for the request router.

DynamoDB has two types of replicas — log and storage replicas. Log replicas only contain replication write-ahead logs. Storage replicas, in addition to having a log, also maintain a state derived from applying/executing the logged commands against the B-tree storage. Both replica types can participate in Paxos quorums, but log replicas are more lightweight and cannot serve reads/queries. The system uses log replicas to improve availability and reliability — it is easier to spin up a log replica that only needs to accept new commands than to rebuild a full storage replica with all the partition’s data. This speed becomes important under failures to restore the system to the proper degree of replication quickly.

From the historical perspective, while DynamoDB started as a pretty basic NoSQL (key-value) store, it has added many features over time, such as secondary indexes, JSON documents, encryption, transactions, and more.

Finally, a decent chunk of the paper focuses on various nuances of running large-scale NoSQL data stores. For example, the paper notes data errors and how DynamoDB verifies the data integrity with checksums for every data transfer between nodes. DynamoDB also does background data verification at rest. Another important lesson on the reliability side of things is the need to maintain enough capacity in the metadata system. While the request routers use caches for metadata to improve performance, a metastable failure in the caching system led to a rather big outage. After the fact, the caches are used only to improve the latency, and no longer offload capacity from the main metadata storage service — all requests for metadata go through to the service even if they are answered by the cache first. This ensures having adequate capacity to serve critical metadata operations regardless of the cache failures. Naturally, this is a more expensive solution for the sake of reliability.

The authors discuss plenty of other lessons and challenges, such as managing the load and balancing capacity of the system and implementing backups and improving availability.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. The Case for Distributed Shared-Memory Databases with RDMA-Enabled Memory Disaggregation

In the 122nd reading group meeting, we read “The Case for Distributed Shared-Memory Databases with RDMA-Enabled Memory Disaggregation” paper by Ruihong Wang, Jianguo Wang, Stratos Idreos, M. Tamer Özsu, Walid G. Aref. This paper looks at the trend of resource disaggregation in the cloud and asks whether distributed shared memory databases (DSM-DBs) can benefit from memory disaggregation (MD) to become the next “hot” thing in the database world. 

The idea, on the surface, is simple — decoupling compute from memory enables the creation of databases with many separate stateless compute workers for query processing that share a remote memory pool. According to the authors, the driving force for this is RDMA, as it allows relatively comparable latency and bandwidth to local memory. On top of that, such a system can also use disaggregated storage for durability. The bulk of the paper then focuses on challenges for such disaggregated design without going in-depth into the database architecture itself. The paper also does not go deeply into the design of the disaggregated memory system, although it points to a few issues to solve. 

The first challenge listed by the authors is the lack of appropriate APIs to access memory. In particular, the paper suggests having APIs that are more in line with how memory is managed locally — memory allocation APIs. In this disaggregated memory pool case, the memory allocation must work with the virtual address space of the memory system. The authors also suggest data transmission APIs facilitate moving data for local caching at compute nodes. Finally, function offloading API can move some simple compute to the memory systems (does not this defeat the purpose of memory and compute disaggregation?)

The second important set of challenges deals with the disaggregated memory system itself. How does such a system handle node failures and still remain available and durable? Sadly, the paper does not provide any concrete details aside from hinting at high-level solutions, all of which will cost the performance — backup on storage, erasure coding, replication, etc.

The third block of challenges has to do with concurrency control. If the database system caches data locally at worker/compute nodes, then we need to worry about cache coherence when multiple workers can access the same memory. Here we see that memory disaggregation is still slow — local cache can be an order of magnitude faster. This is a smaller difference than, let’s say, going from SSD to memory, but it is still substantial. Authors suggest that reduced speed differences in this new memory hierarchy will require new caching protocols, prioritizing execution time and not cache hit rate.

Another concurrency challenge has to do with transactions, as now we have the potential to fit all data in one large memory pool with many workers accessing it concurrently. Again, the paper does not have many concrete solutions but suggests “rethinking distribute commit.” Similar is the “solution” for concurrency control. It is costly to implement locks over RDMA, so we need to rethink CC as well, preferably without locks. Lastly, this all needs to work with thousands of compute nodes.

The last set of challenges is indexing-related. This, again, can get into the tricky RDMA performance limitations, so we need to have an RDMA-conscious index design. Also, the index needs to work well under high concurrency. 


1) Details. Our group collectively found the paper to be rather shallow on details of how these systems may work. While the paper examines some literature on the shared memory databases of the past, it lacks depth and connections with this new paradigm of disaggregated memory used over RDMA. We are especially curious about more depth for concurrency issues and solutions, as many stated issues may have been solved in prior shared memory databases, albeit at a smaller scale.

One example where the paper is very shallow is disaggregated memory system itself. Stating there are challenges with availability and durability in a core component for all DSM-DBs is not going to cut it — the entire premise of the paper depends on such a disaggregated memory system to be fast and reliable. Without these basics, the rest of the discussion becomes largely irrelevant.

2) Memory Disaggregation. We discussed the memory disaggregation idea in general and whether it can become a mainstream technology. See, storage disaggregation is kind of ubiquitous — you create a VM in some cloud, be it AWS or Azure or GCP, and the storage this VM gets is likely to be in a different box (or rather a set of boxes) than your VM’s CPU or memory (think of EBS volumes on AWS EC2). We are ok with this, as this storage is plenty fast and behaves just as if it was located in the same servers as the rest of the virtual hardware. This whole memory disaggregation with RDAM does not work this way, creating a lot of challenges. Most importantly, this disaggregated memory cannot be made (yet?) as universal as disaggregated storage. We won’t run code from it or use it for anything that needs to copy/change contents a lot. As a result, this disaggregated memory, at best, will look like another “storage” solution to systems that use it — something that may be faster than durable storage, but still not fast enough for general use.

Personally, I see more utility in the future with memory disaggregation using CXL. There was a paper at the recent USENIX ATC on the topic. Such a solution may act more like additional memory on-demand that a shared pool of memory between processors/nodes, but it will also not have issues with cache coherence, difficulty, and limitation of RDMA and RDMA’s reliability. I can envision a top-of-rack memory pool, that tenants can tap into if they need more memory or if we need to have a cloud VM product that can scale memory and CPU cores independently of each other. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group Special Session: Scalability and Fault Tolerance in YDB

YDB is an open-source Distributed SQL Database. YDB is used as an OLTP Database for mission-critical user-facing applications. It provides strong consistency and serializable transaction isolation for the end user. One of the main characteristics of YDB is scalability to very large clusters together with multitenancy, i.e. ability to provide an isolated user environment for users in a single cluster. In this talk, we will cover two layers of YDB – Tablet and BlobStorage layers that together provide fault tolerance, scalability, and user isolation.

Tablet is a very lightweight component that implements distributed consensus. It is a building block for user data storage, schema manager, tablet manager, system views, and many other components. A single YDB cluster is known to have more than a million tablets, a single YDB node can serve up to 10K tablets. Tablet implements distributed consensus protocol over a shared log provided by the BlobStorage layer.

The BlobStorage layer is a fault-tolerant key-value append-only database. It implements a specific API for logging, blobs reading and writing, garbage collection, and quorum management. We believe that the protocol between Tablet and BlobStorage is quite simple and understandable by listeners compared to Paxos or Raft. We also illustrate the flexibility of YDB in real-life use cases like migrating some parts of the cluster from one availability zone to another without a maintenance window or storing some data blobs on fast media like SSD and another blob on HDD.

Speaker: Andrey is the head of the YDB project (https://ydb.tech). He works in Yandex as a Head of Department. He also has a long story in Yandex as a member of the Web Crawler Team, a member of the Infrastructure team, one of the creators of Yandex Cloud, and now the part of CloudIL Team. Andrey holds a Ph.D. from Moscow State University.

When: Wednesday, August 10th at 2:00 pm EST (6 pm UTC)

Where: Zoom

Reading Group. Exploiting Symbolic Execution to Accelerate Deterministic Databases

We have covered 60 papers in our reading group so far! The 60th paper we explored was “Exploiting Symbolic Execution to Accelerate Deterministic Databases” from ICDCS’20. I enjoyed the paper quite a lot, even though there are some claims I do not necessarily agree with.

The paper solves the problem of executing transactions in deterministic databases. We can image a replicated state machine, backed by Paxos or Raft. Trivially, in such a machine, each replica node needs to execute the transactions following the exact order prescribed by the leader to guarantee that all replicas progress through the same states of the machine. The good thing here is that nodes run each transaction independently of each other after the execution order has been established by the replication leader. The bad thing is that this naive approach is sequential, so each node cannot take advantage of multiple processing cores it may have. 

Naturally, we want to parallelize the transaction execution. This, however, is easier said than done. To allow for more parallelism, we want to identify the situations when it is ok to run some transactions concurrently without impacting the final state of the state machine. For example, if we know what objects or keys the transaction reads and writes (i.e. the transaction’s read-and-write set), we can group independent transactions that operate on disjoint read-write sets together for parallel execution. For instance, a transaction accessing keys “x, y, and z” is independent of a transaction accessing keys “a, b, and c,” and the two can execute at the same time without impacting each other. 

Of course, this requires us to know what objects/keys each transaction needs before running them, and this is a bit of a problem. In some situations, it may be easy to figure out the read and write sets of a transaction, but this is not always the case. Many systems, like Calvin, do a “pretend run” of a transaction (this is sometimes referred to as a reconnaissance transaction) to figure out the read-and-write set if the set isn’t obvious or annotated in the transaction. This has a few obvious downsides. Obviously, the pretend/reconnaissance phase uses the system’s resources. The reconnaissance run also increases the transaction’s latency. And finally, the reconnaissance is not perfect, and by the time of the “real” run, the read and write sets may have changed due to other transactions impacting the state of the system.  

So, the above description is somewhat generic behavior for many systems out there. And this is where Prognosticator, a system discussed in the paper, comes in. Prognosticator uses Symbolic Execution (SE) to profile transactions and help predict each transaction’s read-and-write set. The system does not need experts to annotate transactions read-and-write sets, but it can still avoid the reconnaissance runs in many situations. Sometimes, however, the reconnaissance must still happen, but Prognosticator uses a few tricks to reduce the possibility of the reconnaissance becoming stale.

Let’s look at the issues with figuring out the read-write set of a transaction. Many transactions are not primitive read and write commands, and involve quite a bit of logic with loops and conditional statements. This means that a state of a client/application may impact both the values written and the keys accessed. For example, consider a transaction that takes some input i:

input i;
if i > 10 then write x:=i;
          else write y:=i;

The read-and-write set of above example depends on the input value already known to the client. The paper calls this transaction an Independent Transaction (IT), as it does not have an internal dependence on the read values. 

Some transactions can be a bit more complicated and have the read-write set depending on the value read by the transaction:

read a;
if a > 10 then write x:=a;
          else write y:=a;

Here the transaction does not know its write set (i.e. writing x or y) until it acquires the value of a. Prognosticator paper refers to these transactions as Dependent Transactions (DT), as the write set has an internal dependence on the read values. 

Obviously, for both types of transactions, we can do the reconnaissance phase to figure out all the logic and branching to learn all the required keys. But we do not really need full reconnaissance for the ITs, as their read-and-write sets only depend on some client input and not the transaction itself. In fact, we can just play out the transaction’s code to figure out the read-write set without actually retrieving any data from the store (i.e. using some dummy values). However, we still somehow need to know whether a transaction is IT, as using dummy values in DT will clearly not work. Moreover, such a “dry run” with dummy values for every IT we encounter is still wasteful, as we do it every time.

Example of Symbolic Execution (SE) – symbolic solution α, path constraint φ.

This is where Prognosticator’s Symbolic Execution (SE) approach shines. With SE, Prognosticator “unwraps” each transaction for all the possible execution branches, leading to a symbolic transaction solution for all possible code paths. If all code paths access the same keys then we have a static read-and-write set. If certain execution branches access different keys, but branching conditions involve only transaction input, then we are dealing with an Independent Transaction (IT). We can easily compute IT’s read-and-write set from the symbolic solution once the input is known. Finally, if SE yields some branches with different access keys, and these branches are conditioned on a transaction’s reads, then we have a Dependent Transaction (DT) and will require a reconnaissance read. 

The Prognosticator “unwraps” each new transaction type only once at the client-side to create such a symbolic execution profile with all possible code branches. There are quite a few optimizations mentioned in the paper. The important gist of these optimizations is the fact that we do not care so much about the actual symbolic solutions, and care only about what keys show up in the read set and write sets. So if two execution branches produce different symbolic solutions, but access the same keys, these branches can be “merged” for the purposes of predicting the read-and-write set. 

The rest of the Prognosticator’s magic depends on a batching technique that allows for a careful deterministic reordering of transactions. With the help of SE, the system identifies all read-only transactions (ROTs) and executes them concurrently at the beginning of the batch. This leaves us with a batch containing only ITs and DTs. The system then reorders DTs to the beginning of the batch. This allows it to run reconnaissance reads on all DTs while also working on ROTs. Since all DTs are now at the beginning of the batch, the reconnaissance reads cannot become stale due to any IT. Reconnaissance may still become stale due to the dependencies between DTs, and in this case, a DT is aborted during the execution phase and is placed in the abort batch to run after the main batch completes and before the next batch.

Lock table with per-key queues on the right. ROTs are not in the table, DTs and ITs are ordered in the table. Only transactions with all keys at the head of the queues can execute.

To execute the ITs and DTs in parallel, Prognosticator uses a lock table. The lock table is a collection of per-key queues, such that for every key in the batch there is an entry in the table with a queue of transactions. A transaction can be executed when it is at the head of the queues for all its keys. Obviously, executing a transaction removes it from all these queues.

The whole transaction execution process runs independently on each node. It is safe because we actually do not need to stick to the leader-prescribed ordered in the batch, as long as we keep the correct order of batches, and deterministically reorder the transactions in the same way on all replica nodes. With the batch execution, we have all the clients waiting for their transactions to finish, and this deterministic reordering does not cause any problems as all waiting transactions are concurrent. This is a common trick used in many systems. 

The whole package with SE and batching provides a significant boost to the throughput compared to Calvin. It is not entirely clear though how much of the boost was enabled by the SE, but I will come back to this point in the discussion summary. 

The paper goes into more detail on many aspects of the paper, including symbolic execution, more efficient implementation of the lock table, resource usage, etc. It was definitely a good read for me. As always, we had a presentation in our reading group, and I had to cover for a missing presenter:


1) Performance gains due to SE vs Batching. The paper claims a great speedup compared to Calvin, however, one question we had is just how much improvement is due to symbolic execution and how much of it is because of clever batching techniques. Let me elaborate. Some benefit comes from the careful reordering of operations. Running ROTs first helps a lot. Running reconnaissance reads at the node (compared to running a reconnaissance transaction at the client) is a lot faster too, and it reduces the possibility of reconnaissance becoming stale. Some of these techniques may be applicable in simpler systems without SE. For example, if we have transactions with annotated read-write sets, these reorderings within the batch become possible. Of course, SE definitely helps find ROTs and separate ITs from DTs to improve/enable the reordering within the batch without the need for annotated transactions. 

2) Slow ROTs. Deterministic transactions are susceptible to clogged pipelines when some big long-running transaction gets in the way and delays consecutive transactions. Parallel/concurrent execution helps here by essentially having multiple processing pipelines. However, Prognosticator has one issue with the read-only transactions (ROTs), making them more susceptible to the clogging problem. The paper mentions that ROTs get executed at the beginning of the batch from a stable snapshot. All workers must complete their ROTs before the system moves to DTs and ITs to make sure that no DT or IT changes that stable snapshot while ROTs are still running. This means that there is a barrier at the end of the ROT phase, allowing a single long-running ROT to screw up the performance by delaying all DTs and ITs in the batch. However, this may be just an artifact of an academic prototype — taking a separate snapshot and running all ROTs from that snapshots should allow ROTS to execute concurrently with writes. 

3) Cost of SE. The paper mentions the cost of doing symbolic execution. There are a few problems here. The first is processing time – more complicated transactions need a lot of time to pretty much exhaustively explore all branching. This also requires putting a limit on the number of allowed loop iterations. The limit can create a situation where a transaction is not fully profiled if it actually goes above the limit, requiring a reconnaissance. The limit, being a configurable parameter, may also require some expert knowledge of the workload to properly configure, and this is something the paper strives to avoid. Another big cost is the memory footprint of transaction profiles. The authors mention that the TPC-C benchmark required 960 MB for transaction profiles, which is not a small cost for a simple benchmark with relatively few transaction types. In the real world, the memory cost of having transaction profiles may be much higher.

4) Extension to sharded systems? Prognosticator works in the replicated system with all nodes storing identical data. It does not work in a sharded environment, yet most large-scale databases are sharded. It may be non-trivial to apply the same approach to sharded systems. After all, running a distributed transaction is harder, and may require some coordination between the shards. At the same time, it may still be possible to separate transactions according to their types and conflict domains with the help of SE to increase parallelism and make transaction execution more independent. Again, real systems based on Calvin’s ideas have cross-shard transactions. A big problem with sharded setup in Prognosticator involves DTs — the system expects to perform the reconnaissance read locally, which means that all data for the transaction must be available at each node. This is not possible in the sharded environment. And making reads non-local will make the system much closer to Calvin with a longer distributed reconnaissance phase and negative performance impact. So, the non-sharded nature of Prognosticator is a huge performance benefit when comparing with more general Calvin.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. chainifyDB: How to get rid of your Blockchain and use your DBMS instead

Our recent meeting focused on Blockchains, as we discussed “chainifyDB: How to get rid of your Blockchain and use your DBMS instead” CIDR’21 paper. The presentation by Karolis Petrauskas is available here:

The paper argues for using existing and proven technology to implement a permissioned blockchain-like system. The core idea is to leverage relational SQL-99 compliant DBMSes that are very common in various IT architectures. The participants in the blockchain can then leverage the existing software stack to implement secure data-sharing or replication. Moreover, the authors argue that in practice different participants may run DBMSes of different versions or even different vendors, making the problem of syncing data between participants more challenging, as different versions/vendors may execute the same SQL transactions slightly differently. To address the heterogeneity issues, the paper proposes a “Whatever-Voting” model, where the system achieves consensus on the results of a transaction, and not on the transaction itself. This allows each node/organization to run the transaction in “whatever” way possible, as long as the final result of the transaction is agreed upon by the quorum of participants in the “voting” phase. If the voting phase does not pass, the node with a mismatched result must roll the transaction back (potentially more than one) and try to recover by reapplying it. To me this seems a bit naive — if a node could not produce the proper results to a correct, non-malicious transaction due to its vendor differences or “whatever” execution differences, it may get stuck in the infinite loop of rollbacks and reapplies until whatever phase is fixed to produce the correct results for a specific transaction. This can be a very error-prone, engineering-intensive process of constant fine-tuning and micromanaging the components.

Step 1: Client creates a Proposal from SQL transaction. Step 2: Early abort of bad proposals. Step 3: Proposal is sent for ordering after all participants verify that it can be executed. Step 4: Ordered produces a block from multiple transactions/proposals. Step 5: Execution against local DBMSes. Step 6: Commit phase after the block is written to DBMS. Step 7: Voting to check the results are identical in the quorum of nodes/organizations. Step 8: Ledger entry is generated if the agreement in Step 7 is successful.

Since the proposed solution does not reach an agreement on transactions, the paper argues that it does not need complicated protocols to ensure that all correct nodes have correct transactions. If some entity issues different transactions to different participants, the results won’t match, missing the quorum agreement and causing a system-wide rollback and retry. However, the voting scheme should still be resilient to some cheating — a node must provide some proof or digest of the authenticity of its results.

The authors argue that their system does not have a centralized authority. However, it still has a centralized control component that batches the transactions and orders them. This component can be untrusted since the participants should catch mismatched batches and roll the state back as needed. That being said, a byzantine centralized batcher/sequencer (the paper calls it orderer) can cause liveness issues by sending garbage to participants causing constant recoveries. The paper does not specify the process of electing/replacing the orderer, so I think in practice it should be trusted for liveness purposes. 

There is a lot more work involved to make the scheme work. For example, each node must remember the transaction batches to create the ledger. This also happens within the DBMS, and some precautions must be taken to ensure that the ledger is tamper-proof. The roll-back procedure is also complicated and must be carefully implemented to have good performance. 

The authors conduct their evaluation on a limited scale with just 3 nodes/DBMS instances and show good scalability compared to Fabric and Fabric++. The evaluation focuses on increasing the number of clients interacting with the DBMSes, and not the number of DBMS instances. The paper does not mention the possibility to have Byzantine clients. Moreover, the evaluation does not consider any byzantine cases. 


1) Paper Quality. Our group’s collective opinion about the quality of the paper was not very enthusiastic. The paper’s premise is interesting and practical, however, it has way too many holes, missed promises, and unsubstantiated claims. These range from inadequate evaluation to relying on centralized unreplaceable “orderer” in a decentralized system to an underspecified voting protocol to glaring engineering holes in the “whatever” model that is prone to liveness issues due to bad implementations. There are way too many to list them all, so I will mention just one in a bit of detail. 

A big undelivered promise is using the DBMSes as “black-box.” While the DBMS software does not need any modifications, the applications running on these DBMSes must be adjusted to accommodate the WV model. Although these adjustments may be relatively small (i.e. making the client talk to chainify server instead of DBMS), it appears that the “whatever” stage may need tweaks on a per transaction-type basis if the DBMSes are very different. This additional logic and tweaking is hardly a black-box approach from the application point of view. Other concerns here involve privacy. What if some tables at one organization are not supposed to be shared but transactions/queries from this same entity touch both shared and private tables? Addressing these may require even bigger application changes and rewrites. And of course, this is not to mention all the additional table chainifying introduces to the database.

2) Preventing Byzantine Behavior vs. Detecting Byzantine Behavior. An interesting point the paper raises (and I wish it was explored more) is whether we need to prevent byzantine actions from taking place or just being able to detect them. Unlike all (most?) of the blockchains, chainifyDB does not prevent bad behavior from executing, and instead, it tries to detect misbehavior and fix it by a reset and restore procedure. Is this approach cheaper on the fundamental level? What applications can it support? Obviously, chainifyDB argues that it supports the permissioned blockchains, but We’d love to see more rigor and research in that direction.

There are some potential issues with this approach, and the paper does not give many details on these. For example, a transaction may execute on all nodes and produce matching result digests for voting. However, if the transaction itself is malicious, and for instance, tries to create value or double-spend something, then the correct nodes must have a mechanism to abort such transaction before its execution (or detect and reset after?). Result voting may not sufficient here. Of course, it is possible to implement some pre-execution sanity check logic to verify that the transaction is compliant with all the rules and constraints. It appears that in chainifyDB this stage happens early and requires every participant’s vote (see Step 2 in the figure in the summary), but the paper is not very clear on the safety implication of this vote/abort decision collection. Also, what about fault tolerance if we need all nodes for this? Additionally, this, in a sense, creates some pre-execution voting that the paper claims to avoid.

3) Issues of Trust. This is a bigger discussion on different types of storage systems and applications that need them. The most popular type of data systems is the ones with trust in a single participant/entity. This is not a problem for an in-house database for example. Many cloud data-intensive services rely on this model, where clients would trust the cloud provider and do not worry about the provider becoming malicious. This even works if the clients distrust each other, as long as they all trust the service provider. The latter is often sold as some form of a permissioned blockchain, but fundamentally it is just a database with an append-only log where safety is ultimately ensured by the trusted provider. Truly permissioned blockchains have no single trusted entity, however, a relatively small set of participants can be trusted as a group. The permissioned nature prevents Sybil attacks by spoofing many fake malicious participants to overtake the network, allowing to have small groups to be trustworthy despite having malicious individuals. Finally, open-blackchains are completely untrusted environments, where only the entire collective of all participants can be trusted. The figure here illustrates this. 

One discussion question we had is when to use permissioned blockchain and why not to stick with open-access protocols instead. Of course, permissioned blockchains or BFT protocols are cheaper to run than open-access counterparts and often have better latency and higher throughput. But is it safe to have trust in a small group of participants? What applications are ok with that level of trust? 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group Special Session: Distributed Transactions in YugabyteDB

When: May 11th at 12:00 pm EST

Who: Karthik Ranganathan.

Karthik Ranganathan is a founder and CTO of YugabyteDB, a globally distributed, strongly consistent database. Prior to Yugabyte, Karthik was at Facebook, where he built the Cassandra database. In this talk, Karthik will discuss Yugabyte’s use of time synchronization and Raft protocol along with some optimizations that enable high-performance distributed transactions.


ACID transactions are a fundamental building block when developing business-critical, user-facing applications. They simplify the complex task of ensuring data integrity while supporting highly concurrent operations. While they are taken for granted in monolithic SQL databases, most distributed DBs would forsake them completely.

Fortunately, this is no longer the case. The trend started with Google Spanner, which offered distributed transactions using GPS based atomic clocks – unheard of in the database world before. Now, distributed transactions – without requiring atomic clocks – are offered by distributed SQL databases. One such example of a fully open source database offering this is YugabyteDB. Using the example of YugabyteDB, this talk will explain how distributed ACID transactions can be achieved without atomic clocks – without compromising on performance.

Keep The Data Where You Use It

As trivial as it sounds, but keeping the data close to where it is consumed can drastically improve the performance of the large globe-spanning cloud applications, such as social networks, ecommerce and IoT. These applications rely on some database systems to make sure that all the data can be accessed quickly. The de facto method of keeping the data close to the users is full replication. Many fully replicated systems, however, still have a single region responsible for orchestrating the writes, making the data available locally only for reads and not the updates.

Moreover, full replication becomes rather challenging when strong consistency is desired, since the cost of synchronizing all database replicas on the global scale is large. Many strongly consistent datastores resort to partial replication across a handful of nearby regions to keep the replication latency low. This creates situations when some clients close to the regions in which data is replicated may experience much better access latency than someone reaching out from the other side of the globe.

Data and consumer are collocated.
No access penalty when data and consumer are in the same region

Data and consumer are in different regions
Access penalty (289 ms in this case) when data and consumer are in different regions

Despite the obvious benefits of adapting to locality changes, many databases offer only static partitioning. Of course, some data stores have the migration capability, but still often lack the mechanisms to determine where the data must be moved. Quite a few orthogonal solutions provide capabilities to collocate related data close together or use days or weeks’ worth of logs to compute better data placement offline. Meanwhile, Facebook’s aggressive data migration helps them reduce the access latency by 50% and save on both storage and networking.

We (@AlekseyCharapko, @AAilijiang and @MuratDemirbas) investigated the criteria for quick, live data migration in response to changes on access locality. We posit that effective data-migration policies should:

  • Minimize access latency,
  • Preserve load balancing with regards to data storage and processing capacity
  • Preserve collocation of related data, and
  • Minimize the number of data migrations.


We developed four simple migration policies aimed at optimizing the data placement. Our policies operate at an arbitrary data-granularity, be it an individual key-value pairs, micro-shards, or the partitions. For simplicity, I say that policies work on objects that represent some data of an arbitrary granularity.

The main point we address with the policies is minimizing access locality, with each policy using a different heuristic to make a data-placement decision. Once the policy finds the most optimal location for an object, it checks the load balancing constraints to adjust the data migration decision as required.

Our simplest policy, the n-consecutive accesses policy, uses a threshold of consecutive accesses to the object to make the placement decision. Although simple, this policy works well for workloads with strong locality in a single region. Majority accesses policy keeps track of some request statistics and uses it to find the region with the most accesses to an object over some time interval. It then migrates the data over to that region.

The exponential moving average (EMA) policy takes a different approach and computes the average region for all requests to the object. The average region is computed as an exponential moving average favoring the most recent requests. This policy can potentially find better placement for objects that have more than one high-access region. However, it requires the regions to have numerical IDs arranged in the order of region’s proximity to each other. This policy falters for deployments with complicated geography and may require multiple migrations to move data to the best location. Another disadvantage of EMA is that it takes longer to settle and requires many data migrations. Unlike other policies that can move the data directly to the desired region, EMA can only migrate objects to one of the neighboring regions, making adjustment such as going from region (1) to (3) include a temporary migration to region (2).

Exponential moving average topology; regions have left and right neighbors
Exponential moving average topology; regions have left and right neighbors

Finally, the center-of-gravity (CoG) policy calculates the optimal object placement by taking into account the distribution of all requests to an object and the distances between the datacenters. CoG policy calculates the region closest to the central location for any access locality workloads. CoG can collect the request statistics similar to the majority accesses policy and make a placement decision only after some time has elapsed since last decision. Alternatively, it can use a continuous metric to assign each region a score corresponding to its weight in the workload, adjust the score and recompute the best object placement with every request.

Computing CoG Weights (L-scores). Region with lowest score is most central to the current workload’s access distribution. Japan is the object’s owner here, despite Australia having more requests overall. L_jp = 0.4 * 128 + 0.15 * 165 + 0.13 * 318 + 0.08 * 165 = 130.49

Some Evaluation

I’ve simulated protocols under different access locality scenarios and calculated the latency of inter-region access and the number of object movements each policy makes. In the simulations, I used 3000 distinct objects, initially assigned to a random region in the cluster of 15 regions. I used the AWS inter-region latencies to specify the distances between simulated regions.  To my surprise, even the most basic policies showed good improvement over static random placement of data.

In the first experiment, the objects were accessed according to a normal distribution. Each object has a ID assigned to it, and some Normal distribution dictates the probability of the drawing the ID each region. All regions have distributions with the same variance, but different means, making each region predominantly accessing some of the objects, and having some group of objects being more-or-less shared across the regions with adjacent IDs.

Locality Workload. 3000 Objects, 15 regions. Probability of object access is controlled by N(200z,100), where z is region ID in range [0, 15)
Locality Workload. 3000 Objects, 15 regions. Probability of object access is controlled by N(200z,100), where z is region ID in range [0, 15)
In this experiment, both CoG and majority accesses policy showed the best results in terms of latency and the number of object movements. This is because the workload almost always favors a single region, and in rarer cases shares the object between two regions. This makes majority heuristic that only considers one region work well. Similarly, 3-consecutive accesses policy shows good latency, but it generates a lot of jitter constantly moving shared objects between neighboring regions.

When the workload is no longer predominantly single region dominant for every key, single-region heuristic policies perform worse. For instance, equally sharing an object between utmost 3 regions out of 15 causes majority and 3-consecutive accesses policies to lock in to one of the sharing regions instead of optimizing the latency for all sharing regions. CoG policy can place the data in a region optimal for all 3 regions (and not even necessarily in one of the sharing regions) and optimize the latency better than a single-region heuristic, topology unaware policies. EMA policy is at a big disadvantage here, since it relies on ID assignments to dictate the proximity of regions. However, the complex geography of AWS datacenters makes a good ID assignment nearly impossible, causing EMA to sometimes overshoot the best region and settle in less optimal one.

Access is shared equally with up to 3 random regions.
Access is shared equally with up to 3 random regions.

Access locality may fluctuate on a regular basis, and the policy needs to be able to adopt to such changes and adjust the system to keep the latencies low. In this experiment I gradually adjust the normal distribution used in the earlier experiment to make a gradual workload switch. In the figure below, the system ran for enough time to make all objects switch the access locality to the neighboring region. However, the policies adopt well to the change, keeping low latency despite the moving workload. EMA is one notable exception again, as it first gets better latency and the gradually degrades until reaching a steady state (In a separate experiment I observe EMA stabilizing over at around 59 ms of latency)

Changing access locality
Changing access locality

Previous experiments did not consider the effect of load balancing. However, a good data-migration policy should refrain from migrating data to overloaded regions. In the next experiment I applied load-balancing filter to the CoG policy to make the migration procedure first compute the best region for the object, check if that region has the capacity, and if no capacity is available, move the data to the next best region with enough processing/storage capacity. Here I used 5 regions and 1000 objects, and limited each region to storing at most 25% of all data. I ran a heavily skewed workload with 80% of all requests coming from a single region. Under these conditions the CoG policy achieves very low average latency. However, as evidenced by the disbalance graph, all objects migrate over to a single region.  If load balancing is enabled, no region becomes overloaded, but latency improvement becomes more modest.

Balancing enabled. Latency on the left, disbalance measured as the difference in object ownership between most and least loaded region



Concluding Remarks

Having data close to the consumers can dramatically improve the access latency. For some databases, this means doing full replication, for other this may involve moving data or the owner/write role from one region to another. It is important to make sure the data is moved to a right location. I have looked at four simple rules or policies for determining the data migration and ran some simulations on these.

There are a few lessons I have learned so far from this:

  • Topology aware rules/polices work better for a larger variety of situations
  • Simple rules, such as just looking a number of consecutive requests coming from a region or determining the majority accesses region can also work surprisingly well, but not always. These tend to break when access locality is not concentrated in a single region, but shared across a few regions in the cluster
  • EMA looked interesting on paper. It allowed to have just a single number updated with every request to determine the optimal data placement, but it performed rather bad in most experiments. The main reason for this is complicated geography of datacenters.
  • Optimizing for latency and adjusting for load balancing constraints to prevent region overload can be done in two separate steps. My simple two-stage policy (presently) looks at load balancing for each object separately. This becomes a first-come-first-serve system, but I am not sure yet whether this can become a problem.
  • EMA policy takes multiple hops to move data to better region, while n-consecutive accesses policy has constant jitter for objects shared by some regions

I have not studied much about data-collocation in my experiments, nor designed the policies to take this into consideration. One of the reasons is that I think related objects will have similar access locality, causing them to migrate to same datacenters. However, this is just a guess, and I need to investigate this further.

One Page Summary: “PaxosStore: High-availability Storage Made Practical in WeChat”

PaxosStore paper, published in VLDB 2017, describes the large scale, multi-datacenter storage system used in WeChat. As the name may suggest, it uses Paxos to provide storage consistency. The system claims to provide storage for many components of the WeChat application, with 1.5TB of traffic per day and tens of thousands of queries per second during the peak hours.

ps_archPaxosStore relies on Paxos protocol to for consistency and replication within tight geographical regions. The system was designed with a great separation of concerns in mind. At a high level, it has three distinct layers interacting with each other: API layer, consensus layer, and storage.  Separating these out allowed PaxosStore provide most suitable APIs and storage for different tasks and application, while still having the same Paxos-backed consistency and replication.

ps_paxoslogIn a paxos-driven consensus layer,  the system uses a per-object log to keep track of values and paxos-related metadata, such as promise (epoch) and proposal (slot) numbers. Log’s implementation, however, seems to be somewhat decoupled from the core Paxos protocol. Paxos implementation is leaderless, meaning there are no single dedicated leader for each object, and every node can perform writes on any of the objects in the cluster by running prepare and accept phases of Paxos. Naturally, the system tries to perform (most) writes in one round trip instead of two by assuming some write locality. After the first successful write, a node can issue more writes with increasing proposal (slot) numbers. If some other node performs a write, it needs to have higher ballot, preventing the old master from doing quick writes. This is a rather common approach, used in many Paxos variants.

The lack of a single stable leader complicates the reads, since there is no authority that has the most up-to-date state of each object. One solution for reading is to use Paxos protocol directly, however this disrupts locality of write operations by hijacking the ballot to perform a read. Instead, PaxosStore resorts to reading directly from replicas by finding the most recent version of the data and making sure a majority of replicas have that version. This works well for read-heavy workloads, but in some high-contention (or failure?) cases the most-recent version may not have a majority of replicas, and the system falls-back to running Paxos for reading.

ps_miniclusterPaxosStore runs in multiple datacenters, but it is not a full-fledged geo-replicated system, as it only replicates between the datacenters located in the same geographical area. The paper is not clear on how data get assigned to regions and whether objects can migrate between regions in any way. Within each datacenter the system organizes nodes into mini-clusters, with each mini-cluster acting as a Paxos follower. When data is replicated between mini-clusters, only one (some?) nodes in each mini-cluster hold the data. This design aims to improve fault tolerance: with a 2-node mini-cluster, failure of 1 node does not result in the failure of the entire mini-cluster Paxos-follower.

ps_latThe paper somewhat lacks in its evaluation, but PaxosStore seems to handle its goal of multi-datacenter, same-region replication fairly well, achieving sub-10 ms writes.

This paper seems like a good solution for reliable and somewhat localized data-store. The authors do not address data sharding and migration between regions and focus only on the intra-region replication to multiple datacenters, which makes me thing PaxosStore is not really “global”, geo-replicated database.  The fault tolerance is backed by Paxos, mini-clusters and the usage of PaxosLog for data recovery. The evaluation could have been more complete if authors showed scalability limits of their system and provided some details about throughput and datacenter-locality of the workload in the latency experiments.

Here is one page pdf of this summary.