Tag Archives: transactions

Reading Group. FoundationDB: A Distributed Unbundled Transactional Key Value Store

Last week we discussed the “FoundationDB: A Distributed Unbundled Transactional Key Value Store” SIGMOD’21 paper. We had a rather detailed presentation by Moustafa Maher.

FoundationDB is a transactional distributed key-value store meant to serve as the “foundation” or lower layer for more comprehensive solutions. FoundationDB supports point and ranged access to keys. This is a common and decently flexible API to allow building more sophisticated data interfaces on top of it. 

FoundationDB is distributed and sharded, so the bigger part of the system is transaction management. The system has a clear separation between a Paxos-based control plane and the data plane. The control plane is essentially a configuration box to manage the data plane. On the data plane, we have a transaction system, log system, and storage system. The storage system is the simplest component, representing sharded storage. Each node is backed by a persistent storage engine and an in-memory buffer to keep 5 seconds of past data for MVCC purposes. The storage layer is supported by sharded log servers that maintain the sequence of updates storage servers must apply.

FoundationDB Architecture

The interesting part is the transaction system (TS) and how clients interact with all the components on the data plane. The client may run transactions that read and/or update the state of the system. It does so with some help from the transaction system, which also orchestrates the transaction commit. When a client reads some data in a transaction, it will go to the transaction system and request a read timestamp or version. On the TS side, one of the proxies will pick up the client’s request, contact the sequencer to obtain the version and return it to the client. This version timestamp is the latest committed version known to the sequencer to guarantee recency. Thanks to MVCC, the client can then reach out directly to the storage servers and retrieve the data at the corresponding version. Of course, the client may need to consult the system to learn which nodes are responsible for storing particular keys/shards, but the sharding info does not change often and can be cached.

Writes/updates and transaction commit procedure are driven by the TS. The client submits the write operations and the read-set to the proxy, and the proxy will attempt to commit and either return an ack or an abort message. To commit, the proxy again uses the sequencer to obtain a commit version higher than any of the previous read and commit versions. The proxy will then send the read and write set along with the versions to the conflict resolver component. The resolver detects the conflicts; if no conflict is detected, the transaction can proceed, otherwise aborted. Successful transactions proceed to persist to the log servers and will commit once all responsible log servers commit. At this point, the sequencer is updated with the latest committed version so it can continue issuing correct timestamps. Each transaction must complete well within the 5 seconds of the MVCC in-memory window. Needless to say, read-only transactions do no go through the write portion of the transaction path since they do not update any data, making reads low-weight.

The failure handling and recovery is an important point in any distributed system. FoundationDB takes a fail-fast approach that may at times sound a bit drastic. The main premise of failure handling on the transaction system is to rebuild the entire transaction system quickly instead of trying to mask failure or recover individual components. The committed but not executed transactions can be recovered from the log servers and persisted to storage, in-progress transactions that have not made it to the log servers are effectively timed out and aborted. Transactions that partly made it to the log servers are also aborted, and new log servers are built from a safe point to not include the partial transactions. Here I just scratched the surface on the recovery, and the paper (and our group’s presentation) is way more accurate and detailed.

Another important point in the paper is the testing and development of FoundationDB. The paper talks about simulator testing. In a sense, the simulator is an isolated environment for development and testing the full stack on just one machine. It comes with a handful of mock components, such as networking and a clock. All sources of non-determinism must be mocked and made deterministic for reproducibility. The paper claims that the simulator is very useful for catching all kinds of bugs with a few exceptions, such as performance bugs. 

FoundationDB Simulator.


1) Flexibility of FoundationDB. Our previous paper was on RocksDB, a key-value single server store. It is meant as the building block for more complex systems and applications. This is very similar in spirit to FoundationDB that is meant as a “foundation” for many more complex systems. However, FoundationDB is way more complex, as it implements the data distribution/replication and transactions. This can potentially limit the use cases for FoundationDB, but obviously, this is done by design. With replication and transactions are taken care of, it may be easier to build higher-up levels of the software stack.

2) Use cases. So what are the use cases of FoundationDB then? It is used extensively at Apple. Snowflake drives its metadata management through FoundationDB. In general, it seems like use cases are shaped by the design and limitations. For example, a 5-seconds MVCC buffer precludes very long-running transactions. The limit on key and value size constrains the system from storing large blobs of data. Arguably, these are rather rare use cases for a database. One limitation is of particular interest for me, and this is geo-replication. 

Geo-replication in Foundation DB. Only one region has TS with a sequencer.

3) Geo-replication. The paper touches on geo-replication a bit, but it seems like FoundationDB uses geo-replication mainly for disaster tolerance. The culprit here is the sequencer. It is a single machine and this means that geo-transactions have to cross the WAN boundary at least a few times to get the timestamps for transactions. This increases the latency. In addition to simply slower transactions, numerous WAN RTT to sequencers can push the transaction time closer to the 5-second limit. So it is reasonable to assume that the system is not designed for planetary-scale deployment.

4) Simulator. We discussed the simulator quite extensively since it is a cool tool to have. One point raised was how a simulator is different from just setting up some testing/development local environment. The big plus for a simulator is its ability to control determinism and control fault injections in various components. There are systems like Jepsen to do fault injection and test certain aspects of operation, but these tend to have more specific use cases. Another simulator question was regarding the development of the simulator. It appears that the simulator was developed first, and the database was essentially build using the simulator environment.

We were also curious about the possibility of a simulator to capture error traces or do checking similar to systems like Stateright. It appears, however, that this is outside of the simulator capabilities, and it cannot capture specific execution traces or replay them. It is capable of controlling non-deterministic choices done in mock components, making a failure easier to reproduce. One somewhat related point mentioned was eidetic systems that remember all non-deterministic choices made in the OS along with all inputs to be able to replay past execution, but this seems like an overkill to try to build into a simulator. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Unifying Timestamp with Transaction Ordering for MVCC with Decentralized Scalar Timestamp

Unlike many of my recent summarier, I will mskr this one short, I promise. “Unifying Timestamp with Transaction Ordering for MVCC with Decentralized Scalar Timestamp” NSDI’21 paper proposes a mechanism to order transactions in multi-version distributed data-stores. One of the problems with distributed transactions is the ordering required to achieve consistency. In particular, we often want to have some serial order of transactions to have an illusion that they could have executed one by one. This is hard to do in a scalable manner. One approach is to try to rely on the physical clocks of the machines, but this is unreliable due to clock skew and clock synchronization issues. Clock skew can introduce causality violations. For example, if transaction TXa happened-before TXb, but due to clock skew, transaction TXa got a larger timestamp than TXb, then we have a causality problem — the cause and effect are reversed if we follow the timestamp order. One way to avoid such causality issues is to rely on a centralized oracle to prescribe the transaction order. Quite a few systems do that, but for obvious reasons, a centralized approach may create scalability and reliability problems. There are a few other ordering mechanisms, such as vector clocks/version vectors and hybrid time.

The authors of the paper take the hybrid time approach that they call Decentralized Scalar Timestamp (DST). DST is a single number that represents the progression of history. It is decentralized, thus avoiding the problems with a single timestamp oracle, and it is smart enough to avoid causality problems. The authors marry the timestamp generation/progression with the concurrency control (CC) mechanism, such as 2PL, allowing CC to adjust the timestamp to match the execution order. Consider two write-conflicting transactions TXa and TXb. Both transactions have some initial timestamps ta and tb respectively. These initial timestamps are based on the timestamps of the last transaction known to the coordianator or client (and ultiamtely based on the lossely sycnhrnonized physical time). And then the authors propose to use CC to bump up the timestamps as needed to ensure that the timestamps follow the execution order managed by the concurrency control mechanism. So for example, if initially ta < tb, but TXb happened-before TXa, then we bump ta := tb + 1. The actual “bumping-up” is a bit more complicated since the time is stored as two components – physical time and logical one, but the result is that the new timestamps are ordered the same way as the transaction execution.

Since the system operates against a multi-version store, the versions from both transactions are preserved. This is important for performing reads that are based on consistent snapshots, so the latest state may advance forward, and a multi-version store ensures that the snapshot remains available for reads. The read-only transactions (ROTs) bypass some concurrency control and try to avoid locking in the paper. Read-only transactions, however, still need to ensure isolation, as it would be unacceptable to see a result of a partial write. To that order, ROTs have to actively write their read version on all objects and wait for locked objects to get unlocked before reading. This enables the consistent cut read and allows new writes to proceed without blocking but with a higher version than the read. 

As far as evaluation, the paper presents multiple different environments and a few different benchmarks along with a pretty good breakdown of comparisons between different approaches. 

We have had our own presentation by David Correa, the recording is available on the reading group’s YouTube:


1) HLC. The biggest discussion topic for this paper was a relation to the Hybrid Logical Clock (HLC). See, HLC is a well-known approach combining physical time and logical time. It keeps the affinity to physical time and maintains the causality just as Lamport’s logical clocks. The authors discuss that their DST approach is a combination of physical time and a logical one, just like HLC. However, the HLC work is never cited. It seems like the internal operation of the timestamp/clock is very similar. Moreover, there are well-described transaction protocols relying on HLC and MVCC described in the literature in great detail. It would be very interesting to see more about the DST clock operation and see its comparison with HLC. Similarly, it would be interesting to compare with transactions scheme by CockroachDB or YugabyteDB.

One bigger difference from typical HLC implementation is that DST increments/ticks only at significant events, such as transaction execution. HLC often tick at message passing in addition to significant events. This is a more general way to ensure clocks causally update on each communication, however, this is not a requirement for protocols with simple communication exchange patterns. In fact, Mongo updates HLC only at significant events. 

2) Performance of Read-only Transactions. ROTs require a write of a timestamp on each object touched by the transaction. This is important for safety to make sure all new writes are ordered after the read-only transaction, so this version update of an object must be durable. We think that this may have a negative impact on performance, as each read includes a disk write in its path.

3) Evaluation. The group had some questions about the evaluation. In particular, the TPC-C benchmark is scaled with respect to districts. typical TPC-S benchmark tries a different number of warehouses, not districts. That being said, districts and warehouses are linked: “Each warehouse in the TPC- C model must supply ten sales districts.” This however raises additional questions, as 20 districts translate to just 2 warehouses, whereas many transactional papers go into 10s or even hundreds of warehouses. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Exploiting Symbolic Execution to Accelerate Deterministic Databases

We have covered 60 papers in our reading group so far! The 60th paper we explored was “Exploiting Symbolic Execution to Accelerate Deterministic Databases” from ICDCS’20. I enjoyed the paper quite a lot, even though there are some claims I do not necessarily agree with.

The paper solves the problem of executing transactions in deterministic databases. We can image a replicated state machine, backed by Paxos or Raft. Trivially, in such a machine, each replica node needs to execute the transactions following the exact order prescribed by the leader to guarantee that all replicas progress through the same states of the machine. The good thing here is that nodes run each transaction independently of each other after the execution order has been established by the replication leader. The bad thing is that this naive approach is sequential, so each node cannot take advantage of multiple processing cores it may have. 

Naturally, we want to parallelize the transaction execution. This, however, is easier said than done. To allow for more parallelism, we want to identify the situations when it is ok to run some transactions concurrently without impacting the final state of the state machine. For example, if we know what objects or keys the transaction reads and writes (i.e. the transaction’s read-and-write set), we can group independent transactions that operate on disjoint read-write sets together for parallel execution. For instance, a transaction accessing keys “x, y, and z” is independent of a transaction accessing keys “a, b, and c,” and the two can execute at the same time without impacting each other. 

Of course, this requires us to know what objects/keys each transaction needs before running them, and this is a bit of a problem. In some situations, it may be easy to figure out the read and write sets of a transaction, but this is not always the case. Many systems, like Calvin, do a “pretend run” of a transaction (this is sometimes referred to as a reconnaissance transaction) to figure out the read-and-write set if the set isn’t obvious or annotated in the transaction. This has a few obvious downsides. Obviously, the pretend/reconnaissance phase uses the system’s resources. The reconnaissance run also increases the transaction’s latency. And finally, the reconnaissance is not perfect, and by the time of the “real” run, the read and write sets may have changed due to other transactions impacting the state of the system.  

So, the above description is somewhat generic behavior for many systems out there. And this is where Prognosticator, a system discussed in the paper, comes in. Prognosticator uses Symbolic Execution (SE) to profile transactions and help predict each transaction’s read-and-write set. The system does not need experts to annotate transactions read-and-write sets, but it can still avoid the reconnaissance runs in many situations. Sometimes, however, the reconnaissance must still happen, but Prognosticator uses a few tricks to reduce the possibility of the reconnaissance becoming stale.

Let’s look at the issues with figuring out the read-write set of a transaction. Many transactions are not primitive read and write commands, and involve quite a bit of logic with loops and conditional statements. This means that a state of a client/application may impact both the values written and the keys accessed. For example, consider a transaction that takes some input i:

input i;
if i > 10 then write x:=i;
          else write y:=i;

The read-and-write set of above example depends on the input value already known to the client. The paper calls this transaction an Independent Transaction (IT), as it does not have an internal dependence on the read values. 

Some transactions can be a bit more complicated and have the read-write set depending on the value read by the transaction:

read a;
if a > 10 then write x:=a;
          else write y:=a;

Here the transaction does not know its write set (i.e. writing x or y) until it acquires the value of a. Prognosticator paper refers to these transactions as Dependent Transactions (DT), as the write set has an internal dependence on the read values. 

Obviously, for both types of transactions, we can do the reconnaissance phase to figure out all the logic and branching to learn all the required keys. But we do not really need full reconnaissance for the ITs, as their read-and-write sets only depend on some client input and not the transaction itself. In fact, we can just play out the transaction’s code to figure out the read-write set without actually retrieving any data from the store (i.e. using some dummy values). However, we still somehow need to know whether a transaction is IT, as using dummy values in DT will clearly not work. Moreover, such a “dry run” with dummy values for every IT we encounter is still wasteful, as we do it every time.

Example of Symbolic Execution (SE) – symbolic solution α, path constraint φ.

This is where Prognosticator’s Symbolic Execution (SE) approach shines. With SE, Prognosticator “unwraps” each transaction for all the possible execution branches, leading to a symbolic transaction solution for all possible code paths. If all code paths access the same keys then we have a static read-and-write set. If certain execution branches access different keys, but branching conditions involve only transaction input, then we are dealing with an Independent Transaction (IT). We can easily compute IT’s read-and-write set from the symbolic solution once the input is known. Finally, if SE yields some branches with different access keys, and these branches are conditioned on a transaction’s reads, then we have a Dependent Transaction (DT) and will require a reconnaissance read. 

The Prognosticator “unwraps” each new transaction type only once at the client-side to create such a symbolic execution profile with all possible code branches. There are quite a few optimizations mentioned in the paper. The important gist of these optimizations is the fact that we do not care so much about the actual symbolic solutions, and care only about what keys show up in the read set and write sets. So if two execution branches produce different symbolic solutions, but access the same keys, these branches can be “merged” for the purposes of predicting the read-and-write set. 

The rest of the Prognosticator’s magic depends on a batching technique that allows for a careful deterministic reordering of transactions. With the help of SE, the system identifies all read-only transactions (ROTs) and executes them concurrently at the beginning of the batch. This leaves us with a batch containing only ITs and DTs. The system then reorders DTs to the beginning of the batch. This allows it to run reconnaissance reads on all DTs while also working on ROTs. Since all DTs are now at the beginning of the batch, the reconnaissance reads cannot become stale due to any IT. Reconnaissance may still become stale due to the dependencies between DTs, and in this case, a DT is aborted during the execution phase and is placed in the abort batch to run after the main batch completes and before the next batch.

Lock table with per-key queues on the right. ROTs are not in the table, DTs and ITs are ordered in the table. Only transactions with all keys at the head of the queues can execute.

To execute the ITs and DTs in parallel, Prognosticator uses a lock table. The lock table is a collection of per-key queues, such that for every key in the batch there is an entry in the table with a queue of transactions. A transaction can be executed when it is at the head of the queues for all its keys. Obviously, executing a transaction removes it from all these queues.

The whole transaction execution process runs independently on each node. It is safe because we actually do not need to stick to the leader-prescribed ordered in the batch, as long as we keep the correct order of batches, and deterministically reorder the transactions in the same way on all replica nodes. With the batch execution, we have all the clients waiting for their transactions to finish, and this deterministic reordering does not cause any problems as all waiting transactions are concurrent. This is a common trick used in many systems. 

The whole package with SE and batching provides a significant boost to the throughput compared to Calvin. It is not entirely clear though how much of the boost was enabled by the SE, but I will come back to this point in the discussion summary. 

The paper goes into more detail on many aspects of the paper, including symbolic execution, more efficient implementation of the lock table, resource usage, etc. It was definitely a good read for me. As always, we had a presentation in our reading group, and I had to cover for a missing presenter:


1) Performance gains due to SE vs Batching. The paper claims a great speedup compared to Calvin, however, one question we had is just how much improvement is due to symbolic execution and how much of it is because of clever batching techniques. Let me elaborate. Some benefit comes from the careful reordering of operations. Running ROTs first helps a lot. Running reconnaissance reads at the node (compared to running a reconnaissance transaction at the client) is a lot faster too, and it reduces the possibility of reconnaissance becoming stale. Some of these techniques may be applicable in simpler systems without SE. For example, if we have transactions with annotated read-write sets, these reorderings within the batch become possible. Of course, SE definitely helps find ROTs and separate ITs from DTs to improve/enable the reordering within the batch without the need for annotated transactions. 

2) Slow ROTs. Deterministic transactions are susceptible to clogged pipelines when some big long-running transaction gets in the way and delays consecutive transactions. Parallel/concurrent execution helps here by essentially having multiple processing pipelines. However, Prognosticator has one issue with the read-only transactions (ROTs), making them more susceptible to the clogging problem. The paper mentions that ROTs get executed at the beginning of the batch from a stable snapshot. All workers must complete their ROTs before the system moves to DTs and ITs to make sure that no DT or IT changes that stable snapshot while ROTs are still running. This means that there is a barrier at the end of the ROT phase, allowing a single long-running ROT to screw up the performance by delaying all DTs and ITs in the batch. However, this may be just an artifact of an academic prototype — taking a separate snapshot and running all ROTs from that snapshots should allow ROTS to execute concurrently with writes. 

3) Cost of SE. The paper mentions the cost of doing symbolic execution. There are a few problems here. The first is processing time – more complicated transactions need a lot of time to pretty much exhaustively explore all branching. This also requires putting a limit on the number of allowed loop iterations. The limit can create a situation where a transaction is not fully profiled if it actually goes above the limit, requiring a reconnaissance. The limit, being a configurable parameter, may also require some expert knowledge of the workload to properly configure, and this is something the paper strives to avoid. Another big cost is the memory footprint of transaction profiles. The authors mention that the TPC-C benchmark required 960 MB for transaction profiles, which is not a small cost for a simple benchmark with relatively few transaction types. In the real world, the memory cost of having transaction profiles may be much higher.

4) Extension to sharded systems? Prognosticator works in the replicated system with all nodes storing identical data. It does not work in a sharded environment, yet most large-scale databases are sharded. It may be non-trivial to apply the same approach to sharded systems. After all, running a distributed transaction is harder, and may require some coordination between the shards. At the same time, it may still be possible to separate transactions according to their types and conflict domains with the help of SE to increase parallelism and make transaction execution more independent. Again, real systems based on Calvin’s ideas have cross-shard transactions. A big problem with sharded setup in Prognosticator involves DTs — the system expects to perform the reconnaissance read locally, which means that all data for the transaction must be available at each node. This is not possible in the sharded environment. And making reads non-local will make the system much closer to Calvin with a longer distributed reconnaissance phase and negative performance impact. So, the non-sharded nature of Prognosticator is a huge performance benefit when comparing with more general Calvin.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Special Session: Distributed Transactions in YugabyteDB

When: May 11th at 12:00 pm EST

Who: Karthik Ranganathan.

Karthik Ranganathan is a founder and CTO of YugabyteDB, a globally distributed, strongly consistent database. Prior to Yugabyte, Karthik was at Facebook, where he built the Cassandra database. In this talk, Karthik will discuss Yugabyte’s use of time synchronization and Raft protocol along with some optimizations that enable high-performance distributed transactions.


ACID transactions are a fundamental building block when developing business-critical, user-facing applications. They simplify the complex task of ensuring data integrity while supporting highly concurrent operations. While they are taken for granted in monolithic SQL databases, most distributed DBs would forsake them completely.

Fortunately, this is no longer the case. The trend started with Google Spanner, which offered distributed transactions using GPS based atomic clocks – unheard of in the database world before. Now, distributed transactions – without requiring atomic clocks – are offered by distributed SQL databases. One such example of a fully open source database offering this is YugabyteDB. Using the example of YugabyteDB, this talk will explain how distributed ACID transactions can be achieved without atomic clocks – without compromising on performance.

Reading Group. Performance-Optimal Read-Only Transactions

Last meeting we looked at “Performance-Optimal Read-Only Transactions” from OSDI’20. This paper covers important topics of transactional reads in database/data-management systems. In particular, the paper discusses “one-shot” read-only transactions that complete in 1 network round-trip-time (RTT) without blocking and bloated and expensive messages. If this sounds too good to be true, it is. Before presenting these types of transactions, the authors discuss why it is impossible to have Non-blocking, One round-trip, Constant size messaging, Strictly Serializable (NOCS) read-only transactions. This becomes a “pick 3 out of 4” kind of deal.

The Performance-Optimal Read-Only Transaction (PORT) system shows how to get away with NOC and try to get as close as possible to S. For One round-trip constraint, the paper makes the clients coordinate their own read-transactions and control the ordering of reads, all in one round of message exchange. This requires the clients to send some recency/progress metadata over to servers. In the case of PORT, the metadata is a Version Clock, a type of logical clock. It is just a number, so it is Constant-size. Finally, the servers can use metadata to return the latest value that satisfies the recency constraint imposed by the Version Clock in a Non-blocking manner. The servers also avoid coordination to again satisfy the One round-trip requirement. To make sure the reads do not block, PORT never considers the in-progress operations. PORT separates the in-progress operations from the completed ones with a stable frontier time/version. Clients must request reads at what they know to be the latest stable, immutable state of the system and never try to request the state from the in-progress operations. Since different clients may have different and stale knowledge of the stable frontier, the system needs to support reading different versions of data, hence PORT relies on a multi-version store. 

PORT also does some clever trickery to improve the consistency. For example, a promotion mechanism is used to block-out a range of versions for writing in some cases. If some data was written with a version v=10 and then a read transaction has requested a value at version v=15,  the v=10 value will be promoted to occupy the entire range of versions [10, 15], and servers will be disallowed to write anything in that range. This, however, does not cause the write in that version range to abort, and instead, it will be written at version v=16. 

The paper implements PORT in ScyllaDB and Eiger and shows nearly identical throughput in read-heavy workloads to that of non-transactional reads while also beating Eiger’s transactions. There are quite a few important details and nuances on implementing PORT. The implementation on top of Eiger is full of surprises as the promotion mechanism described above no longer works for transactional writes, and PORT uses another clever trick. 

The presentation by Alex Miller that goes into a bit more details than my summary:


1) SNOW theorem. NOCS theorem the authors discuss sounds similar to the SNOW. Well, it is by the same first author, so this makes some sense. Both are about read-only transactions, both concern the trade-offs between performance and latency. NOCS focuses on performance-optimal transactions, while SNOW talks about latency-optimal. Both talk about the impossibility of having the highest consistency and the be “x-optimal” (fill in the “x”). Moreover, the NOC (non-blocking, one round trip, constant metadata) implies that performance here largely means latency as well. It jsut happens that if we stop doing all the extra work, then the throughput improves as well. In a sense, it appears that NOCS is a rebranding of SNOW to some extent. We can even map letters in both abbreviations to similar concepts. S = (strict) serializability in both cases. N = Non-blocking. O = one round trip (in SNOW it is coordinate/retry, which is pretty much whether we add more messages or not). So three letters are the same, W & C are the difference, but even there we can find some similarities. W in SNOW stands for write conflict avoidance, and one way to do so may require violating C in constant metadata. The paper itself mentions that NOCS is similar to SNOW.

2) Other causal systems. Occult and PaRiS were brought up to the discussion briefly. We have not spent too much time on this though. Occult is a causal system that avoids “slowdown cascades” due to dependencies and the need to enforce causality. PORT with its one-RTT non-blocking mechanism seems to be similar in this regard, so a comparison would be interesting. 

3) HLC for the logical clock? HLCs are used in the transactions in MongoDB and Cockroach. HLCs are logical clocks, constant in size, and do help identify consistent cuts/snapshots for transactions. MongoDB uses HLCs for cross-partition causal transactions, and it seems to fit well within the NOC. CockroachDB is more involved, but it also uses HLC. Another important part of HLC is that it can provide a single serial order, but this is something PORT actually avoids in Eiger-PORT since it needs to provide a different serial order to different clients to enforce read-your-write property without blocking.

4) On the importance of a stable frontier. A stable frontier is the time in the system’s execution that separates what is safe to read and what is not. Everything before the stable frontier is committed/executed and safe, any operation after the frontier may not have been fully written/committed yet, and is not safe. This separation is clear in Scylla-PORT, but gets blurred in Eiger-PORT and its read-your-write reordering. 

5) Replication. The paper does not address replication issues at all, so one has to wonder about how it handles replication and associated failures. For example, in Cassandra/Scylla, a read succeeds after being completed by some read-quorum that may be smaller than all replicas for the object. This means that you can promote the value on a subset of replicas, and then do a write on some quorum containing the un-promoted replicas and end up with the same write recorded under different versions on different replicas. This may or may not be a huge problem, but a conversation on replication/failures would be very useful. The code (which is open source) may help to shed the light on this, but we have not had a chance to look at it during the discussion.

6) Eiger-PORT. This one is very different from the ScyllaDB version. It is different from how the paper described PORT all along since Eiger-PORT cannot promote the operations because now writes are in a transaction, and all writes from one transaction must be promoted to a higher version atomically. If you do that, you need to coordinate between servers and add messages and lose the O part of NOC. Authors go into more details describing the Eiger-PORT protocol, which is not the easiest thing to grasp from the first read. It is also mind-twisting when you start reordering operations for different clients. Actually, as of the time of this writing, we were still discussing some aspects of Eiger-PORT in our group’s slack channel.

7) Evaluation. We liked the choice and rationale for picking the baseline systems to evaluate against. PORT indeed showed to have low overhead in ScyllaDB while improving the database’s consistency semantics.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Aria: A Fast and Practical Deterministic OLTP Database.

In our 33rd reading group meeting, we discussed “Aria: A Fast and Practical Deterministic OLTP Database.” by Yi Lu, Xiangyao Yu, Lei Cao, Samuel Madden. We had a very nice presentation by Alex Miller:

Quick Summary

Aria is a transaction protocol, heavily influenced by Calvin, and it largely adopts Calvin’s transaction model, with one big difference. In Calvin, read and write sets of a transaction must be known beforehand, but Aria has no such strict requirement, allowing for generally more flexible transactions. Aria’s main goal is parallelizing transactions as much as possible to maximize the throughput. To that extent, Aria adopts batching and processes multiple transactions in each batch concurrently.

ariaEach batch operates in two phases: execution and commit. All transactions in a batch start the execution phase from the same snapshot (the result of committing and applying the previous batch). In the execution phase, each transaction concurrently executes and produces the new state, which is stored temporarily. Once all transactions in a batch have finished executing, the protocol moves into the commit phase, where Aria aborts any transaction that has a Write-after-Write(WAW) conflict or Read-after-Write(RAW) conflict. Aria uses unique and sortable transaction IDs to determine the order of transactions within the batch to find these conflicts. Any aborted transaction goes to the future batch, while the remaining successful transactions commit and apply their temporary execution results. Once the commit has finished, a new batch can start, and the protocol keeps moving in these lock-steps, synchronizing after each step. Every replica can run the protocol without synchronizing/communicating with other replicas unless the system is sharded/partitioned and one replica need to read data from another shard. An important optimization of Aria is deterministic reordering, where transactions within a batch can be reordered (i.e. not follow the order of their TIDs) to reduce the number of aborts, and consequently, reduce the amount of wasted work in the execution phase.

Discussion Points

We had good participation and touched on quite a few things in the discussion, but to the credit of the paper, we were able to find lots of answers there just by reading more carefully.

1) Transaction Serializability. Paper claims serializability, but it applies a bunch of transactions to the same snapshot. How does it compare to snapshot isolation? and what is the difference that allows serializability?

We think the batch processing and addition of RAW conflict within the batch makes a difference. Snapshot Isolation checks for WAW conflicts only and allows some artifacts. By also disallowing RAW conflicts, we can eliminate the problem. However, there are nuances with how a read set can be defined for conflict resolution. For example, What is a read set in a transaction running UPDATE items SET x=0 WHERE x=1? It can be either all items, or only items with x=1 if some indexing is used, and this difference may result in serializability issues. For the paper’s defense, it does not explicitly consider a SQL model, and also mentions that things like the above are up to the users to decide, and a fall-back to Calvin-like approach.

2) Batch size. This is a batched protocol, so the batch size may play a role in the performance. Batches that are too small will have more frequent barriers between batches and phases. Batches that are too large have a higher chance of having a super long transaction that stalls the entire batch, impacting the performance. The paper mentions that the transactions should take about the same time to execute for best performance to avoid the case when one slow transaction stalls the entire batch.

3) Paper readability. The overall consensus in the group was that this was one of the easier transaction appears to read.

4) Comparison with other transaction systems. SLOG paper we discussed a few months ago also uses determinism on each node to run transactions. Unlike Aria though, SLOG uses determinism for execution order, while Aria executes unordered (from the same state) and deterministically aborts the conflicting transactions. Overall it seems that Aria’s use of determinism is more extensive – the same snapshot, transactions are deterministic themselves, deterministic conflict search and abort, etc.

There was also a mention of CockroachDB’s transactions, since they also use snapshot, and produce temporary results, but Cockroach is more optimistic than deterministic. Also, Cockroach is more interested in low-latency transactions, while Aria is all about high throughput even at the expense of latency.

5) Deterministic reordering. The paper mentions that the reordering is a best-effort algorithm and not the most optimal one. It also seems that reordering plays a big role in having high throughput, so can we squeeze more performance with a better reordering algorithm? Obviously, it is not efficient to brute-force all possible permutations, but maybe some better heuristic approach?


6) Performance variance under conflict. Aria works best for workloads with not a lot of conflict between transactions. Reordering helps, but not in all cases, as evidenced in the evaluation.


Join our reading group on slack for more discussions, paper schedule, and zoom participation.

Ocean Vista: Gossip-Based Visibility Control for Speedy Geo-Distributed Transactions

Ocean Vista

On Wednesday we had a presentation and discussion of the Ocean Vista (OV) replication and distributed transaction protocol. OV works in the WANs, where each region has all data-partitions, and transactions can originate in any region. OV separates replication from transaction execution, by making replication conflict-free with a FastPaxos-inspired protocol. For the transaction execution, OV maintains the visibility watermarks, such that any transaction ongoing the replication is not yet visible, and all transactions below the watermark are visible. The protocol computes the watermark via a hierarchical gossiping protocol by taking the minimum watermark from each region. The regional watermark, in its turn, is a minimum of the server watermarks in the region, and a server watermark is an ongoing transaction with a minimal timestamp on that server. A few additional optimizations exist to allow reading from a single server, but these require an additional watermark to designate the full replication of a transaction.

Below you can find a presentation from our reading group by Balaji Arun



Our discussion focused on a few points/questions:

(1) Why does OV protocol use the FastPaxos-like algorithm for replication? FastPaxos requires the use of larger “super quorums”, however, OV replication is not client-driven, and a server specifically picks a unique timestamp for the replication. In FastPaxos, multiple commands may be tried on the same instance (i.e. timestamp), requiring a larger quorum for recovery in phase-1 of Paxos with a smaller majority quorum. However, in OV we do not see such conflicts: the timestamps for transactions are unique and are assigned by one node that coordinates replication. The only possible conflict that we saw happening is when a transaction first tries to replicate the command and then issues an abort on the same timestamp. This arguably creates a write-write conflict on the same instance (timestamp), but we think it can be resolved by establishing fixed precedence to make aborts always win over the regular writes. With precedence order established, writing abort to a majority of nodes should be sufficient to make abort persistent and recoverable. We have not reached a definite conclusion on why OV uses larger fast quorums, so we may still be missing something in our understanding of the protocol.

(2) Another discussion point was concerning the evaluation and comparison. A recent SLOG paper solves a similar problem, so we were wondering how it may behave compared to the OV. The group’s conclusion was a definite “it depends.” From one perspective, OV is more decentralized, so it may be able to achieve higher throughput when there are many multi-partition transactions. SLOG is more centralized, having both a dedicated master per partition and a dedicated ordering layer for the transactions involving many partitions. Both systems, however, do not abort transactions in most of the cases (unless there are significant failures), so their performance may be close to each other. 

(3) Related to (2). Is comparison with TAPIR a fair one? Two protocols operate quite differently. We thought that it would have been nice to see a comparison with Calvin and SLOG.

(4) Performance for geo-sharded setup could suffer greatly. If we do not have all partitions/shards in every region, we will need to do a cross-region replication. In the protocol, the visibility watermark gets updated after the replication is complete, and having WAN replication will delay that process. Moreover, this may delay the visibility of other transactions that do not have geo-replication but have a timestamp after the geo-replicated transaction. 

Review: Implementing Linearizability at Large Scale and Low Latency

In this post I will talk about Implementing Linearizability at Large Scale and Low Latency SOSP 2015 paper.

Linearizability, the strongest form of consistency, can be very important in large scale data storage systems, although many such systems either do not implement linearizability or do not fully expose serializable operation to the clients. The later type of systems can maintain linearizability for internal operations that occur between servers, but do not provide the same consistency to the clients.

The authors of the paper provide a linearizability framework, called RIFL, suitable for use in existing non-linearizable RPC based distributed system. The framework allows to convert existing RPC into linearizable ones in just a few lines of code with minimal impact on the overall performance. The paper only discusses RPC-based systems, since according to the paper, linearizability requires a request-response protocol to operate. I think it may be possible to sue RIFL-like system for message passing approaches as long as receiving each message eventually produces an ack to the sender.


In order to better understand RIFL and how it is beneficial in the data-store system, we need to talk about Linearizability. According to the paper, Linearizable operations appear to happen instantaneously and only once at same point in the execution of a system. It is important to understand that in a real system an operation can take some time to execute and can potential fail midway through its execution. Linearizable system must make it appear to all its clients as if the operation happened right away. The ability to execute operations only once is another important point, as many existing systems retry execution of operation upon failure. Authors say such operations follow at-least-once semantics, whereas linearizable operations have exactly-once semantics.  In order to achieve certain consistency guarantees, many existing systems use idempotent operations which produce the same outcome regardless of how many times such operations have been executed. Authors show an example in which running such operation more than once can break the linearizability after a certain failure.non-linearizable_example

Example of at-least-once semantic breaking linearizability.

In this case we have two clients interacting with a single server. Client B writes 2 to the server but it crashes before the server has a chance to respond. When client A reads the data, it will get the value written by B. Later client A can write a different value, while client B is recovering from the failure. Once client B as back up, it does not know that its previous operation has succeeded, so it retries it and overwrites the later value written by A.  Authors do not mention how likely such example to occur in practice, but given a large scale of the system with thousands or even millions clients, it will be unwise to discount the possibility of such failure. Nothing is mentioned whether any of the existing data-store systems address the issue.


RIFL framework allows the conversion of the system relaying on the at-least-once RPC operations into linearizable exactly-once operations. The main idea behind RIFL is storing the results of the RPC execution, so that in case of a retry of an RPC call the system could have used already known result without having to re-run the procedure. The results of the RPC invocation are stored on the completion records, and each such record is associated with each unique RPC. This ensures the exactly-once operation of the RPCs in the system, but also opens up a number of problems that had to be solved.


High level representation of RIFL logic

In order to operate properly, the system must be able to detect retry calls. In order to make such detection easy, each RPC is assigned a 128-bit ID number consisting of a 64-bit client ID and 64-bit sequence at such client. If an operation is to be retried by the client, it must use the same ID. Before execution of an RPC, the server will check if it is aware of a completion record for such RPC and if it does not exist, RPC continues, but if a completion record is present then the server will return the results stored in the completion record instead of running an RPC.

Migrating completion records is essential in the event of a failure, as the system relies on the presence of such records to make a decision on whether an RPC needs to run. From time to time, data can migrated from one server to another, especially in case of a server crash. The new server must have the completion records available to it after the migration, so each completion records is attached to one of the data-objects being modified by an RPC, so that moving the object will also move all the completion records for RPCs acting on the object. Unfortunately, authors do not explain in detail how the migration is made, as this part is probably left out to the underlying system. It is very likely that completion records also get replicated with the objects they belong to for durability reasons, although no mechanism for such replication is described as well, so it is worth to assume that the completion record replication is left out to the underlying system.

Overtime many completion records are going to be created for each object, increasing the storage requirements and the bandwidth used for replication and migration of objects. In order to improve resource utilization, a garbage collection mechanism for old completion records was devised. In RIFL a completion record can get removed from the system if a client acknowledges that it knows of a successful RPC execution and will not retry it in the future. Such acknowledgments are piggybacked to the new RPC requests and as a result incur minimal overhead. In case of a client failure, no acknowledgement will be sent to the server, causing certain completion records to persist. In order to deal with this problem, RIFL uses lease manager to grant leases to all the clients. In case a client lease is not renewed, all completion records for the client will be purged. It is not clear how a centralized lease system can impact the overall performance of a system implementing RIFL. A time synchronization between the client, lease manager and a server is used to reduce the need to communicate: the server will contact lease manager only when it estimates that the client lease will expire soon. This portion of a lease protocol raises some questions about the reliability of the lease sub-system. What is going to happen if time skew is greater than the server estimates for? If the server time is ahead of lease manager time, server will start issuing more check requests to the lease manager, but if it is lagging behind the lease manager time, than the server may think lease is still good while the client may have already been dead. I think the worst case scenario is that GC does not collect all dead completion records, which may not be of a big immediate problem, but may eventually lead to the excess memory consumption by the server applications.

Transactions with RIFL

Authors implemented a transaction system using RIFL for linearizability on top of RAMCloud, a distributed, in memory key-value datastore. A two-phase commit protocol similar to Sinfonia is used to implement transactions. In the first phase of the protocol, usually called a prepare phase, a set of read, write or delete commands is sent to servers and each server upon receiving prepare determines if it can proceed with the commit. If all servers can commit, then a second phase finalizes the transaction. RIFL makes crash recovery simpler compared to Sinfonia. Since each prepare operation is linearizable, retires of the prepare will not cause and adversary effects. Upon a more serious crash, recovery manager can learn if the results of the prepare operation without the knowledge of the original commit commands, and if all prepares have succeeded, it can finalize the transaction; in case of some prepare failures, transaction is simply aborted.

One important point authors make in the paper is about traditional way of implementing linearizability on datastores and how it differs from their implementation. In the existing system, linearizability is implemented on top of a transaction system and according to the authors this approach creates more cumbersome transaction mechanism. With RIFL, transactions were implemented on top of a linearizability layer, which authors claim is a better approach.



RIFL was implemented and evaluated in RAMCloud. Overall, authors claim only 5% reduction in latency for RIFL linearizable write RPCs compared to the original writes. No significant difference in throughout was observed when using RIFL.


Added overhead of RIFL to the RAMCloud system. Left is latency, right is throughput.

Transaction performance was evaluated with TPC-C benchmark typically used for performance evaluation of Online Transaction Processing (OLTP) systems.  RIFL RAMCloud was compared against H-store database. Both system are in-memory databases but they different significantly in their purpose and typical use cases. As a matter of fact, RIFL RAMCloud solution had to be specifically implemented for TPC-C benchmark. When comparing the two systems, authors found out that RAMCloud with RIFL significantly outperforms H-store in all tests. I am a bit skeptical about these results, at least without more knowledge about how RAMCloud was used to make TPC-C benchmark work with it and whether the implementation of RIFL & RAMCloud interface for TPC-C benchmark was specifically tailored for the tests performed by TPC-C. It may have been a good idea to compare the system against other transaction protocols implemented in RAMCloud, such as the ones based on consensus.

Overall Thoughts

When reading the paper I thought that the idea of caching the results of RPC calls is a very straightforward and simple and I am surprised it has not been exploited before. Yes, store such cache presents a few challenges, mainly in memory management of the overall system, as the cache size can grow large, but as shown in RIFL, these are not very big challenges and can be solved with simple protocols and existing tools, such as ZooKeeper.  Authors claim that implementing transactions on top of linearizability layer is a better and faster approach. Transactions (mini-transactions?) implementation became easier with RIFL, but I am not sure the performance benefit is obvious. On my opinion performance comparison with H-store seems somewhat unfair.