Reading Group. FoundationDB: A Distributed Unbundled Transactional Key Value Store

Last week we discussed the “FoundationDB: A Distributed Unbundled Transactional Key Value Store” SIGMOD’21 paper. We had a rather detailed presentation by Moustafa Maher.

FoundationDB is a transactional distributed key-value store meant to serve as the “foundation” or lower layer for more comprehensive solutions. FoundationDB supports point and ranged access to keys. This is a common and decently flexible API to allow building more sophisticated data interfaces on top of it. 

FoundationDB is distributed and sharded, so the bigger part of the system is transaction management. The system has a clear separation between a Paxos-based control plane and the data plane. The control plane is essentially a configuration box to manage the data plane. On the data plane, we have a transaction system, log system, and storage system. The storage system is the simplest component, representing sharded storage. Each node is backed by a persistent storage engine and an in-memory buffer to keep 5 seconds of past data for MVCC purposes. The storage layer is supported by sharded log servers that maintain the sequence of updates storage servers must apply.

FoundationDB Architecture

The interesting part is the transaction system (TS) and how clients interact with all the components on the data plane. The client may run transactions that read and/or update the state of the system. It does so with some help from the transaction system, which also orchestrates the transaction commit. When a client reads some data in a transaction, it will go to the transaction system and request a read timestamp or version. On the TS side, one of the proxies will pick up the client’s request, contact the sequencer to obtain the version and return it to the client. This version timestamp is the latest committed version known to the sequencer to guarantee recency. Thanks to MVCC, the client can then reach out directly to the storage servers and retrieve the data at the corresponding version. Of course, the client may need to consult the system to learn which nodes are responsible for storing particular keys/shards, but the sharding info does not change often and can be cached.

Writes/updates and transaction commit procedure are driven by the TS. The client submits the write operations and the read-set to the proxy, and the proxy will attempt to commit and either return an ack or an abort message. To commit, the proxy again uses the sequencer to obtain a commit version higher than any of the previous read and commit versions. The proxy will then send the read and write set along with the versions to the conflict resolver component. The resolver detects the conflicts; if no conflict is detected, the transaction can proceed, otherwise aborted. Successful transactions proceed to persist to the log servers and will commit once all responsible log servers commit. At this point, the sequencer is updated with the latest committed version so it can continue issuing correct timestamps. Each transaction must complete well within the 5 seconds of the MVCC in-memory window. Needless to say, read-only transactions do no go through the write portion of the transaction path since they do not update any data, making reads low-weight.

The failure handling and recovery is an important point in any distributed system. FoundationDB takes a fail-fast approach that may at times sound a bit drastic. The main premise of failure handling on the transaction system is to rebuild the entire transaction system quickly instead of trying to mask failure or recover individual components. The committed but not executed transactions can be recovered from the log servers and persisted to storage, in-progress transactions that have not made it to the log servers are effectively timed out and aborted. Transactions that partly made it to the log servers are also aborted, and new log servers are built from a safe point to not include the partial transactions. Here I just scratched the surface on the recovery, and the paper (and our group’s presentation) is way more accurate and detailed.

Another important point in the paper is the testing and development of FoundationDB. The paper talks about simulator testing. In a sense, the simulator is an isolated environment for development and testing the full stack on just one machine. It comes with a handful of mock components, such as networking and a clock. All sources of non-determinism must be mocked and made deterministic for reproducibility. The paper claims that the simulator is very useful for catching all kinds of bugs with a few exceptions, such as performance bugs. 

FoundationDB Simulator.


1) Flexibility of FoundationDB. Our previous paper was on RocksDB, a key-value single server store. It is meant as the building block for more complex systems and applications. This is very similar in spirit to FoundationDB that is meant as a “foundation” for many more complex systems. However, FoundationDB is way more complex, as it implements the data distribution/replication and transactions. This can potentially limit the use cases for FoundationDB, but obviously, this is done by design. With replication and transactions are taken care of, it may be easier to build higher-up levels of the software stack.

2) Use cases. So what are the use cases of FoundationDB then? It is used extensively at Apple. Snowflake drives its metadata management through FoundationDB. In general, it seems like use cases are shaped by the design and limitations. For example, a 5-seconds MVCC buffer precludes very long-running transactions. The limit on key and value size constrains the system from storing large blobs of data. Arguably, these are rather rare use cases for a database. One limitation is of particular interest for me, and this is geo-replication. 

Geo-replication in Foundation DB. Only one region has TS with a sequencer.

3) Geo-replication. The paper touches on geo-replication a bit, but it seems like FoundationDB uses geo-replication mainly for disaster tolerance. The culprit here is the sequencer. It is a single machine and this means that geo-transactions have to cross the WAN boundary at least a few times to get the timestamps for transactions. This increases the latency. In addition to simply slower transactions, numerous WAN RTT to sequencers can push the transaction time closer to the 5-second limit. So it is reasonable to assume that the system is not designed for planetary-scale deployment.

4) Simulator. We discussed the simulator quite extensively since it is a cool tool to have. One point raised was how a simulator is different from just setting up some testing/development local environment. The big plus for a simulator is its ability to control determinism and control fault injections in various components. There are systems like Jepsen to do fault injection and test certain aspects of operation, but these tend to have more specific use cases. Another simulator question was regarding the development of the simulator. It appears that the simulator was developed first, and the database was essentially build using the simulator environment.

We were also curious about the possibility of a simulator to capture error traces or do checking similar to systems like Stateright. It appears, however, that this is outside of the simulator capabilities, and it cannot capture specific execution traces or replay them. It is capable of controlling non-deterministic choices done in mock components, making a failure easier to reproduce. One somewhat related point mentioned was eidetic systems that remember all non-deterministic choices made in the OS along with all inputs to be able to replay past execution, but this seems like an overkill to try to build into a simulator. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Evolution of Development Priorities in Key-value Stores Serving Large-scale Applications: The RocksDB Experience

On Wednesday, we had our 26th reading group meeting, discussing RocksDB with a help of a recent experience paper: “Evolution of Development Priorities in Key-value Stores Serving Large-scale Applications: The RocksDB Experience.” Single-server key-value storage systems are crucial for so many distributed systems and databases. For distributed folks like myself, these often remain black-boxes that you pick up and use. That is until something in your system starts to crumble peculiarly, and you dive in to investigate…

Anyway, what I want to say is that KV-stores are important. Their performance matters a lot in the world of fast CPUs and fast networks, where every millisecond of slowdown at storage can no longer be “masked” by other “slow” components. This is where this paper takes us — improving the performance, reliability, and feature-set of RocksDB over the years as technology and demands have evolved. 

To understand the experiences and lessons of the paper, we first need to look at the underlying technology behind RocksDB. In a nutshell, RocksDB is an LSM-tree (Log-Structured Merge tree) key-value storage. LSM trees have been used in storage for quite some time, as they are relay good for write-intensive workloads. The basic idea of the LSM tree is that data are stored sorted by key. These sorted files are called sorted-strings tables or SSTables for short.

Sorted Strings Table with key-value pairs.

Now, maintaining SSTables requires that data are written to storage sequentially in such a sorted state. Of course, the data does not come pre-sorted to a database, so the system needs to do something else before writing these sorted files to disk. The storage system will keep an in-memory buffer, called memtable, of some relatively large number of updates. This memtable can be represented as some tree structure to allow for efficient insertions. Before each operation is added to a memtable, it is written to a write-ahead-log (WAL) for durability. The WAL reconstructs a memtable in the event of a failure. Once memtable reaches a certain size, it is flushed to disk in a sorted manner. At this time a new empty memtable can start. An important aspect of writing these sorted files is keeping track of their recency order. 

Memtable and SSTable files/segments.

When a read request for a key arrives, the system first looks at the memtable to see if data is there. Memory lookup is relatively cheap since no disk access is needed. However, if the requested key is not in the memtable, then we must search on disk, starting from the most recent SSTable segment. Looking up data on a disk can be slow since the system needs to scan a good chunk of a file to find the spot where the key might exist in the sorted list. Naturally, we want to take advantage of the sorted nature of the file. For this, a system maintains a sparse index for each file with the offsets to narrow down the search. Then the system only needs to scan a portion of a file between the two offsets where the key may exist. If the data is missing in the most recent file, then a search continues in the next most recent one and so on. This process results in some peculiar behaviors. For instance, it generally takes less time to find more frequently used data. But it also takes a lot of time to find out that the data is missing entirely. Fishing for non-existent data is a waste of time, so an additional index, a bloom filter, can be used to tell whether the key is guaranteed to be missing.

Index points to some file offset. To lookup key ‘city’, find where ‘city’ fits in the index (between ‘blog’ and ‘food’) and search in that part of the file.

Another caveat the sequential writes create for us is dealing with old versions of data. See, when we write an SSTable to disk, it is immutable, and when an update or delete to a key comes in, this update will eventually flush to a more recent file. This influx of new data creates a situation where old data that is no longer needed keeps occupying space and potentially increases search time. So the system needs to clean up old data frequently. A compaction procedure mitigates the space amplification by cleaning up old data. It essentially takes multiple files and merges them into one bigger file.

Compaction removed old value of ‘city’. Old files are removed and replaced with a new compacted one.

So, my oversimplified descriptions of LSM storage is not necessarily how RocksDB operates, but it should give enough intuition for us to proceed and dive into the lessons and experiences of Facebook engineers working with RocksDB. 

Resources: IOPS vs Space vs CPU

The paper starts by exploring resource efficiency and how optimization priorities were changing over time. RocksDB runs best on SSDs, and these storage devices have a limited lifespan bound by the number of write cycles. Naturally, engineers focused on issues of write amplification (the same data rewritten multiple times) to make sure SSDs do not die prematurely. Interestingly enough, the paper almost makes it sound like write-amplification mitigation efforts were largely wasted. The authors state that the workloads used at Facebook are not too IOPS-heavy (does it meant they are not very write-heavy for write-optimized storage?), and storage space was a more pressing concern. Because of this, the engineers have shifted their efforts to the space-amplification problem (a key occupies more space than it needs to, for example, due to having multiple old versions of it).

Another issue brought up is the CPU utilization. Here, again, the paper states that CPUs are rarely a bottleneck. However, to me, it seems like these represent a delicate resource trade-off. For example, to reduce space consumption, we may need to use more aggressive compression that uses more CPU cycles and more aggressive compaction that needs both CPU and IOPs (and increases write-amplification). So I am not sure about the correctness of saying whether some resource here is a bottleneck or not. They all can be a problem, and it seems more about the ability to reach some balance for a given workload and infrastructure. I believe the need to find such balance in different applications is part of the reason behind the multiple compaction strategies mentioned in the paper.

A significant portion of the paper then focuses on dealing with resources at scale. For example, many instances of RocksDB may coexist on one server, requiring resource management to prevent one instance from hogging all the resources. Other resource-related aspects involve the treatment of write-ahead logs (WALs). For example, it is possible to completely turn off RocksDB’s internal WAL to conserve resources. Of course, this leaves the system vulnerable to data loss in the event of a crash, but this may not be a problem if an application using Rocks has its own WAL for things like transactions or replication. An interesting mention for resource management is rate-limiting file deletion. This issue seems a bit specific, but the authors explain how file deletion can be costly and impact other tenants using the same SSD.


The paper also extensively talks about new features and their significance. Similar to how the authors have approached resource efficiency, these features largely stem from operating at scale. Many of the points simply make sense when I read them, but I suspect that these realizations were not as easy in practice and carry some production pain points. For example, we usually expect backward compatibility, but designing forward compatibility, where an older version should be compatible with a newer one, is definitely a result of sleepless nights after unrolling from some newer but buggy version and realizing that data files changed to the point that the old version no longer understands them. 

The flexibility of RocksDB is another weaved-in theme of the paper. Since the storage system is used in a variety of applications with a variety of needs, this again makes total sense. It appears that the main goal of many features is to make the system more extensible and fit into many different contexts without creating any roadblocks on purpose. One such example is improvements to configuration management that went from “in-code” configuration to having configuration files. However, one big configuration problem directly stems from the flexibility goal — too many different parameters to tune, and it seems like there is no good solution for this. 

The paper presents a few other examples of flexibility features meant to help build apps on top of RocksDB. If implemented, native versioned storage can greatly help systems relying on multi-version concurrency control (MVCC). This, however, may come with a performance penalty. At the same time, MVCC systems have already been relying on RocksDB for storage, since the “no roadblocks” principle provides great flexibility in how keys and values are encoded, allowing versioning information to be a part of the key. 

Replication and backup support got their own subsection in the papers, but this is nothing but a trivial “you can copy the files to another machine to start a new replica” approach. This is hardly a feature, but again, it plays nicely with the idea of designing a system with as few roadblocks as possible and letting users/engineers be creative with using it. 


Reliability is a big topic in the paper. We want the data stored in the database to remain correct and intact. Luckily, there is a very concise summary for this — use checksums! The authors point out that their checksum procedures only work for data already in storage and that they are still working on checking the integrity of data in memtables. This memory corruption may not be that big of a problem though. Thankfully, unlike our personal computers, servers rely on ECC memory that can handle some memory issues all by itself. 

I will finish my summary with a large table of features and changes to RocksDB straight from the paper. 

And as always, we have our groups presentation by Rohan Puri available on YouTube:


We had a very long discussion after the presentation. I think it lasted almost an hour, just talking about KV-stores in general and RocksDB in particular. There is no way I can possibly summarize every discussion point, but I will try to pick the important ones (by my judgment of their importance) 

1) Scratching the surface. This point started in our pre-presentation discussion. While the paper talks about many different features and issues and tries to explain the reasons for the decisions taken, some explanations barely scratch the surface. Of course, it would be rather difficult to talk about eight years of development and go into deep technical discussions. However, what interested the group the most are some rather odd talking points throughout the paper. For example, talking about rate-limiting file deletions is oddly specific. Why not have rate-limiting for all tasks that may have a high impact on IOPS? These oddly specific examples scream about rather interesting back-stories that are obviously missing from the paper. 

2) Checksums. The checksum discussion was rather interesting. There are multiple layers of checksums. For example, block checksums make a lot of sense, as they are written when an SSTable block flushes to disk. One observation made in the reading group is that the file checksums were added late in the RocksDB lifecycle. A plausible explanation for this is file checksums are rarely needed, as they would come in handy when, for example, copying the entire SSTable file from one machine to another to start a new replica. And in this hopefully rare occasion, we can check the integrity of the data the long way — open the file and go block by block and check block checksums. 

3) Replication. Obviously, RocksDB is a single-server system, but it serves as a store for many replicated systems. In the group, we found it interesting that the paper still talks about replication. However, the replication discussions in the paper boil down to designing the permissive systems that allow to built replicated solutions on top.

4) Too flexible? One of the bigger goals of RocksDB is its flexibility to fit into different applications with different requirements. This creates a system that has too many features, with any application only using a handful of them. However, this ability to tune and have all these features complicates the configuration and management of the system. One notable example is CockroachDB that developed its own in-house replacement for RocksDB with fewer features, and having fewer features seems to be a big bragging point for Cockroach folks. 

5) Impact of Facebook hardware infrastructure. One concern raised during the discussion was the impact of hardware infrastructure at Facebook on the overall design trajectory described in the paper. Of course, it is true, that Facebook deploys RocksDB in their systems and their own infrastructure. But it also means that other non-Facebook users have to adjust to decisions made with Facebook-grade infrastructure in mind. 

One such example is the write-amplification vs space-amplification discussion in the paper. While Facebook engineers have concluded that on their SSDs (and their workloads), write-amplification does not pose a serious risk of premature SSD failures, the same may not be the case for other users who may have lesser quality SSDs or more write-demanding workloads. It is a serious enough concern that authors acknowledge the existence of LSM-tree solutions with better write-amplification mitigation strategies. Moreover, at least some of these solutions have been put into production use already. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Unifying Timestamp with Transaction Ordering for MVCC with Decentralized Scalar Timestamp

Unlike many of my recent summarier, I will mskr this one short, I promise. “Unifying Timestamp with Transaction Ordering for MVCC with Decentralized Scalar Timestamp” NSDI’21 paper proposes a mechanism to order transactions in multi-version distributed data-stores. One of the problems with distributed transactions is the ordering required to achieve consistency. In particular, we often want to have some serial order of transactions to have an illusion that they could have executed one by one. This is hard to do in a scalable manner. One approach is to try to rely on the physical clocks of the machines, but this is unreliable due to clock skew and clock synchronization issues. Clock skew can introduce causality violations. For example, if transaction TXa happened-before TXb, but due to clock skew, transaction TXa got a larger timestamp than TXb, then we have a causality problem — the cause and effect are reversed if we follow the timestamp order. One way to avoid such causality issues is to rely on a centralized oracle to prescribe the transaction order. Quite a few systems do that, but for obvious reasons, a centralized approach may create scalability and reliability problems. There are a few other ordering mechanisms, such as vector clocks/version vectors and hybrid time.

The authors of the paper take the hybrid time approach that they call Decentralized Scalar Timestamp (DST). DST is a single number that represents the progression of history. It is decentralized, thus avoiding the problems with a single timestamp oracle, and it is smart enough to avoid causality problems. The authors marry the timestamp generation/progression with the concurrency control (CC) mechanism, such as 2PL, allowing CC to adjust the timestamp to match the execution order. Consider two write-conflicting transactions TXa and TXb. Both transactions have some initial timestamps ta and tb respectively. These initial timestamps are based on the timestamps of the last transaction known to the coordianator or client (and ultiamtely based on the lossely sycnhrnonized physical time). And then the authors propose to use CC to bump up the timestamps as needed to ensure that the timestamps follow the execution order managed by the concurrency control mechanism. So for example, if initially ta < tb, but TXb happened-before TXa, then we bump ta := tb + 1. The actual “bumping-up” is a bit more complicated since the time is stored as two components – physical time and logical one, but the result is that the new timestamps are ordered the same way as the transaction execution.

Since the system operates against a multi-version store, the versions from both transactions are preserved. This is important for performing reads that are based on consistent snapshots, so the latest state may advance forward, and a multi-version store ensures that the snapshot remains available for reads. The read-only transactions (ROTs) bypass some concurrency control and try to avoid locking in the paper. Read-only transactions, however, still need to ensure isolation, as it would be unacceptable to see a result of a partial write. To that order, ROTs have to actively write their read version on all objects and wait for locked objects to get unlocked before reading. This enables the consistent cut read and allows new writes to proceed without blocking but with a higher version than the read. 

As far as evaluation, the paper presents multiple different environments and a few different benchmarks along with a pretty good breakdown of comparisons between different approaches. 

We have had our own presentation by David Correa, the recording is available on the reading group’s YouTube:


1) HLC. The biggest discussion topic for this paper was a relation to the Hybrid Logical Clock (HLC). See, HLC is a well-known approach combining physical time and logical time. It keeps the affinity to physical time and maintains the causality just as Lamport’s logical clocks. The authors discuss that their DST approach is a combination of physical time and a logical one, just like HLC. However, the HLC work is never cited. It seems like the internal operation of the timestamp/clock is very similar. Moreover, there are well-described transaction protocols relying on HLC and MVCC described in the literature in great detail. It would be very interesting to see more about the DST clock operation and see its comparison with HLC. Similarly, it would be interesting to compare with transactions scheme by CockroachDB or YugabyteDB.

One bigger difference from typical HLC implementation is that DST increments/ticks only at significant events, such as transaction execution. HLC often tick at message passing in addition to significant events. This is a more general way to ensure clocks causally update on each communication, however, this is not a requirement for protocols with simple communication exchange patterns. In fact, Mongo updates HLC only at significant events. 

2) Performance of Read-only Transactions. ROTs require a write of a timestamp on each object touched by the transaction. This is important for safety to make sure all new writes are ordered after the read-only transaction, so this version update of an object must be durable. We think that this may have a negative impact on performance, as each read includes a disk write in its path.

3) Evaluation. The group had some questions about the evaluation. In particular, the TPC-C benchmark is scaled with respect to districts. typical TPC-S benchmark tries a different number of warehouses, not districts. That being said, districts and warehouses are linked: “Each warehouse in the TPC- C model must supply ten sales districts.” This however raises additional questions, as 20 districts translate to just 2 warehouses, whereas many transactional papers go into 10s or even hundreds of warehouses. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Scalable but Wasteful or Why Fast Replication Protocols are Actually Slow

In the last decade or so, quite a few new state machine replication protocols emerged in the literature and the internet. I am “guilty” of this myself, with the PigPaxos appearing in this year’s SIGMOD and the PQR paper at HotStorage’19. There are better-known examples as well — EPaxos inspired a lot of development in this area. However, there seems to be a big disconnect between literature and production. While many sophisticated protocols, such as EPaxos, Atlas, SDPaxos, Comprmentalized Paxos, appear in the literature, and some of them are rather popular in the academic community, the production systems tend to rely on more conservative approaches based on Raft, Multi-Paxos, or primary-backup.

There are probably a few reasons for this disconnect. For instance, as we discussed in Paxos vs Raft reading group meeting, explaining protocols in terminology closer to actual code and having good reliable reference implementations help with adoption. However, the difficulty of implementing some technology should not be a roadblock when this technology offers substantial improvements. 

Well, it appears that there are substantial quantitative improvements. Consider EPaxos. The protocol presents a leader-less solution, where any node can become an opportunistic coordinator for an operation. The exciting part is that EPaxos can detect conflicts, and when operations do not conflict with each other, both of them can succeed in just one round trip time to a fast quorum. The original paper claims better throughput than Multi-Paxos, and this is not surprising, as EPaxos avoids a single-leader bottleneck of Multi-Paxos and allows non-conflicting operations to proceed concurrently. This is a big deal! 

Throughput of Multi-Paxos and EPaxos in 5 dedicated nodes cluster under low conflict.

I wanted to find the truth here, so I and a couple of colleagues worked on experimentally testing Multi-Paxos and EPaxos. We performed a simple experiment, shown in the figure here, to confirm EPaxos’ performance advantage, and we achieved somewhere between 15-25% better throughput in 5 nodes EPaxos than Multi-Paxos when running on identical VMs (EC2 m5a.large with 2vCPUs and 8 GiB of RAM). Original EPaxos paper has a bit larger performance difference in some situations, but we have spent significantly more time on our Multi-Paxos implementation and optimizations than EPaxos. Anyway, we see a substantial improvement that was stable, reproducible, and quite large to make a real-world difference. Not to mention that EPaxos has a few other advantages, such as lower latency in geo-distributed deployments. So what gives? 

I have a theory here. I think many of these “faster” protocols are often slower in real life. This has to do with how the protocols were designed and evaluated in the first place. So bear with me and let me explain. 

Most evaluations of these consensus-based replication protocol papers are conducted in some sort of dedicated environment, be it bare-metal servers in the lab or VMs in the cloud. These environments have fixed allocated resources, and to improve the performance we ideally want to maximize the resource usage of these dedicated machines. Consider Multi-Paxos or Raft. These protocols are skewed towards the leader, causing the leader to do disproportionately more work than the followers. So if we deploy Multi-Paxos in five identical VMs, one will be used a lot more than the remaining four, essentially leaving unused resources on the table. EPaxos, by design, avoids the leader bottleneck and harvests all the resources at all nodes. So naturally, EPaxos outperforms Multi-Paxos by using the resources Multi-Paxos cannot get a hold of ue to its design. Such uniform resource usage across nodes is rather desirable, as long as avoiding the leader bottleneck and allowing each node to participate equally comes cheaply. 

Aggregate CPU utilization for Multi-Paxos and EPaxos in 5 nodes

Unfortunately, it is not the case, and EPaxos’ ability to use resources that would have been left idle by Multi-Paxos comes at a steep penalty! Intuitively, EPaxos needs to do a lot more work to maintain safety and remain leader-less — it needs to communicate the dependencies, compute dependency graphs, check for conflicts in these graphs, and resolve the conflicts as needed. This added complexity contrasts with Multi-Paxos that provides most of its safety via a few simple comparisons both at the leader and followers. To try to estimate the cost of EPaxos’ role uniformity, we need to look at resource usage. In the same experimental run as in the absolute performance figure earlier, we have measured CPU usage across all VMs. Expectedly, we were able to observe that EPaxos is very good at using all available resources, and Multi-Paxos consumes the entire CPU only at one node – the leader. However, it does not take long to spot something odd if we start looking at the aggregate resource usage, shown in the figure here. It turns out that Multi-Paxos achieved nearly 14k ops/s of throughput consuming roughly 200% CPU (and leaving 300% unused), while EPaxos did a bit over 18k ops/s with all 500% of aggregate CPU utilization. 

Throughput normalized by the CPU usage

This suggests that EPaxos needs overall more CPU cycles across all nodes to finish each operation. We can see this better if we normalize the throughput per the amount of CPU consumed. As seen in the figure, this normalized throughput presents a big gap in the efficiency of protocols. Of course, this gap may vary and will depend on the implementation. It may even shrink if some better engineers implement EPaxos, but I also think there is a fundamental issue here. EPaxos and many other protocols were designed to take advantage of all allocated resources in the cluster since allocated but unused resources are wasted. This resource usage paradigm is common when we have dedicated servers or VMs. However, technology has moved on, and we rarely deploy replicated services or systems all by themselves in isolation. With the advances in virtualization and containerization, we increasingly deploy replicated systems in resource-shared environments that are designed to pack applications across clusters of servers in a way to avoid having idle physical resources. 

Aggregate throughput of 5 Multi-Paxos and EPaxos instances in a task-packed 5 nodes cluster.

Moreover, most replicated systems are sharded, so we normally have multiple instances of the replication protocol supporting shards of one larger system. This makes task-packing simpler as we have many relatively uniform tasks to schedule between machines. Consider Yugabyte DB for example. Karthik Ranganathan in his talk described how the system schedules raft groups across a cluster such that some server has a few resource-heavy raft leaders and plenty more resource-light raft followers. This type of packing allows Yugabyte (and Cockroach DB, Spanner, Cosmos DB, and so on) to achieve uniform resource usage on VMs or dedicated servers while having non-uniform, but efficient replication protocols. Here I show how 5 instances of Multi-Paxos compare with 5 instances of EPaxos when deployed on larger servers (32 GiB RAM and 8 vCPUs). This drastic difference compared to the dedicated environment is due to the fact that we were able to pack multiple instances of Multi-Paxos to consume all the resources of the cluster, and clock-for-clock, EPaxos has no chance of winning due to its lower efficiency. 

Ok, so I think there are quite a few things to unpack here. 

  • Absolute performance as measured on dedicated VMs or bare-metal servers is not necessarily a good measure of performance in real-life, especially in resource-shared setting (aka the cloud)
  • We need to consider efficiency when evaluating protocols to have a better understanding of how well the protocol may behave in the resource-shared, task-packed settings
  • The efficiency of protocols is largely under-studied, as most evaluations from academic literature simply focus on absolute performance.
  • The efficiency (or the lack of it) may be the reason why protocols popular in academia stay in academia and do not get wide adoption in the industry. 

In conclusion, I want to stress that I am not picking on EPaxos. It is a great protocol that has inspired a lot of innovation in the area distributed consensus. Moreover, I am sure there are use cases for EPaxos (or more recent extensions, such as Atlas), especially in the WAN setting when trading of some efficiency for better latency may be acceptable. Many other protocols (mine included) may have similar efficiency problems, as they approach the performance issue similarly – find the bottlenecked node and move its work elsewhere. The fundamental issues at play here are that (1) moving the work elsewhere comes at an efficiency penalty, and (2) in many modern environments there may not be any resource available for such “moved” work due to resource budgets and tight task-packing.

I, Abutalib Aghayev, and Venkata Swaroop Matte discuss the efficiency issues in a bit more detail in our upcoming HotStorage’21 paper: “Scalable but Wasteful: Current State of Replication in the Cloud.”

Reading Group. Strong and Efficient Consistency with Consistency-Aware Durability

In the 62nd reading group session, we covered the “Strong and Efficient Consistency with Consistency-Aware Durability” paper from FAST’20. Jesse did an excellent presentation for the group that explains the core of the paper rather well:

This paper describes a problem with many leader-based replication protocols. It specifically focuses on ZooKeper and Zab, but similar issues arise in Multi-Paxos, and Raft systems as well. Linearizable reads are expensive, as they need to go through a lease-protected leader (or carry out a full consensus round in consensus-based replication if leases are to be avoided). This puts a single leader node in the middle of everything, which is bad for throughput. 

But what if we allow reading from followers? Systems like ZooKeeper allow such reads, but these have a few problems. For one, followers may be stale, so such reads do not have the same recency guarantees as the linearizable solutions. Secondly, reading from followers violates monotonic reads if the client is to jump between the followers it reads from without having a special session/recency token. The third problem, the one the paper is specifically solving, is that there is no monotonicity between clients — when one client reads a value of version n, nothing prevents another client at a later time to read version n-1 from a different node. 

So, the goal of the paper is to allow reading from followers while sacrificing as little consistency as possible. The system introduced in the paper, called ORCA, solves two consistency-related problems. First, it ensures that local followers return to clients only durable data that cannot be lost in the face of failures. Second, ORCA enforces some recency guarantees in the form of cross-client monotonic reads.

To achieve both goals, the system introduces a globally replicated durable-index. The purpose of this index is to let the nodes know up to which point in the log the data has been durably (here durability means persistence to some non-volatile storage) replicated to some quorum of nodes (traditionally, this would be a majority quorum). This durability marker allows the followers to avoid returning a value that can be lost due to a crash. For example, consider some object k with the initial value k1 and a corresponding durable-index=1. Another value k2 is being replicated in log position 2. If a local node returns k2 before it is durable, there is a chance that k2 was not replicated and persisted at the majority of nodes yet, and a minority partition or failure can erase k2. To prevent this, the leader maintains a durable-index that gets updated once the quorum of replicas acknowledged the persistence of data to storage. In the example, once the k2 is durable and followers acked, the leader updates its durable-index:=2, and sends it to the followers. A follower can only return a value k2 if it has received a durable-index>=2 (i.e the index that has moved to or past the value k2). Naturally, there are plenty of situations when the follower may be aware of k2 but has not received the durable-index=2 update just yet. In this case, the follower cannot return the data just yet. It cannot return k2, since it is not durable yet, but it also cannot return k1, as some other follower may have already returned k2 to a client. One solution would be to wait out and see whether the durable-index eventually catches up. A more proactive approach would forward the read to the leader, and let the leader synchronously replicate k2 (and tell followers to flush it to disk) before returning the value to the client. The paper favors the proactive approach, as it acts somewhat as “read-repair” applicable to a variety of leader-based replication schemes. 

The paper presents this durable-index as a big contribution, although I am a bit skeptical about this. See, existing systems have been doing the whole durable-index business for a very long time. I will focus on just one example, however — Raft. In Raft, the leader must communicate the leader’s commitIndex with the followers after a quorum has been reached so that all nodes can mark all log items or updates up to commitIndex as committed. If we assume that Raft persists operations to disk at each node upon receiving them, then the leader’s commitIndex in the “commit” message (i.e next AppendEntry RPC) is the same as durable-index from this paper. A similar global commit progress marker can be implemented in Paxos-style systems despite a few challenges with out-of-order commits in Multi-Paxos. 

For paper’s defense, it makes the durable-index a bit more general than Raft’s approach. For example, the paper allows for more relaxed primary backup implementations, asynchronous replication, and implementations with delayed disk persistence. Consider adding durable-index to an asynchronous leader-driven system to allow durable reads. The system can check whether the value to be returned is durable, allowing it to do synchronous “read-repair” only when needed, and preserving the benefits of async replication as often as possible.

Ok, so we have a durable-index that marks the global progress of the system, and the leader distributes the durable-index to all nodes. This is enough to ensure the data read once won’t ever get lost due to a failure, but this is still not enough to ensure monotonicity. If one server in the cluster of 3 is slow, it may not have received some updates and the new durable-index as 2 other faster nodes have successfully applied some more operations. This can lead to the slow node returning older data than the other two nodes may have already returned. Similar can happen when one replica is network partitioned. For a single client, this monotonicity problem can be solved with some sort of session token to make recency of data observed by the client, but there is no elegant solution for the cross-client case. 

The naive solution to the monotonicity problem, which Jesse describes in great detail in the presentation, expands the durable-index requirement to all nodes. In this case, a follower can only return a value when its state matches the durable state agreed upon by all nodes, so there can be no slow or partitioned replicas. For instance, we may have 3 nodes, all having a persisted history of k1, k2, k3. Since all nodes have the value k3, then it is clearly durable. A leader node can then send the durable-index=3 to signal the rest of the cluster that a 3rd item (i.e. k3) is durable. Let’s say one follower f1 has received this durable-index. We can allow this replica to return k3 locally. At the same time, other follower f2 may not have received the durable-index update just yet, so it is aware of k3 but does not think it is durable. The node f2 won’t return k2 or k3 and instead will forward the read to the leader, which will ensure that k3 is durable and no other operation is in progress before returning k3.

Requiring durability/replication to all nodes has been explored before. If I continue to draw parallels with Raft and take into account the flexible quorums result, then configuring Raft with a replication phase requiring a quorum of all nodes will achieve this durable-index of all nodes approach. Needless to say that Raft will provide linearizability and not just monotonic reads (as long as followers block in case of a dirty read until they receive updated commitIndex from the leader). There is an obvious problem with the quorum of all nodes crippling the availability when one node goes down, however, there are also examples of production systems that make this work to some degree.

Anyway, ORCA does not require durability on all nodes due to availability issues. Instead, it introduces the “active-set,” a set of nodes that must achieve durability for the durable-index to progress. Reads are possible against the replicas in the active-set, while reads against nodes outside of the active-set are prohibited (forwarded to leader?). The active-set must be at least the majority of nodes for fault tolerance reasons. The lease protects the active-set to allow failed or partitioned nodes to fall out of the set safely. This idea is largely described in Paxos Quorum Leases paper, which, again, provides linearizability. 


1) Linearizable Alternatives

Flexible Quorums in Write All Read One configuration. I already touched on this point in the summary, but I also raised it in our group’s discussion. One thing that is different between just running Raft with write quorum of all nodes and ORCA is that ORCA can do “fast writes” since it is not doing linearizability. These fast writes are just uncommitted writes when clients receive a write acknowledgment before an operation has been quorum-committed. Some systems, like MongoDB, also have them, and they seem to cause more confusion and problems than benefits. So, we do not think this is really a good feature to have in a system with stronger consistency guarantees. In fact, this consistency relaxation seems to be the only big difference between Flexible-Paxos (Raft) approach and the naive ZooKeeper-based ORCA. 

Paxos Quorum Leases. Paxos Quorum Leases (PQL) paper was brought up during the discussion too. At the high level, PQL is similar to lease-protected active-sets of ORCA. However, we were surprised to see a complete lack of comparison between ORCA’s active-set approach and PQL. 

Paxos Quorum Read. The problem of scaling read throughput in strongly consistent systems has been explored by me before. Paxos Quorum Read (PQR) approach allows client-driven reads from a quorum of followers. This is not as scalable as reading from just one node, but PQR avoids any leases and sticks to linearizability. 

2) Safety of durable-index

Lease safety. As with all systems that rely on leases, proper timeouts are essential for safety. Lots of care must be put into establishing proper timeouts. For example, it is essential to have partitioned nodes in active-set to timeout and “kill themselves” before the leader has a chance to timeout and pick a new active-set. If a leader creates a new active-set before the bad node in the old one dies, we may have a situation with two nodes returning different data. Timeouts also need properly working clocks. If one clock ticks too quickly or too slowly, this may also be a problem. 

Replication safety. An interesting point was brought up regarding the safety of durable-index. Since the durable-index update may get delivered faster to some nodes and slower (or be dropped altogether) to others, this creates a situation when a client can read from a more up-to-date node and receive some newer data, let’s say value k3. A client may then contact a different node that has not received the new durable-index and still thinks k2 is durable. This node cannot return k2as it would be a violation of monotonic reads! The node must wait for k3 to become durable, which in ORCA means that the read gets forwarded to the leader. So in other words, if a node knows of some non-durable update to an object, it is prettymuch blocked, and needs to forward to the leader to avoid safety problems. This leads to the next discussion point.

3) Performance cost. The safety of the protocol relies on fixing the durability while doing a read operation. So every time a read is done against the node that knows of some newer update that is not durable yet, the protocol forwards to the leader and potentially re-replicates. Even the forwarding part alone can be costly. It will cost not only extra latency but also the resources needed to support these multi-hop operations. 

ORCA is banking on the fact that we have a system with multiple keys/objects and that the access patterns are such that it is unlikely for us to both have an outstanding non-durable update and a read operation for the same key at the same time. This is generally an ok assumption, and many systems do that. The problem is that the opposite case is a complete disaster for the system. Having just a handful of “hot keys” will cause frequent forwards to the leader and all the extra work and extra latency. At the very best this will create bad latency, but because this creates a lot more work for the system, it actually lowers the maximum throughput achievable in the system. Can this translate to a metastable failure?

4) Practicality of cross-client monotonic reads. The final discussion topic I want to mention is the practicality of such cross-client monotonic read consistency. The paper goes into a few examples. However, monotonic reads are a far cry from more stringent consistency models. ORCA does not explicitly enforce read-your-write property, which, intuitively, is desirable in a system claiming strong consistency. For instance, Facebook goes out of its way to provide read-your-write consistency across its stack of weakly consistent systems to make development easier and prevent user confusion. 

As a last remark, I want to add that consistency properties with the paper’s approach will depend on the underlying leader-based protocol and implementation. As I mentioned a few times, Raft already has durable-index backed in, and solutions like PQL can provide something similar to the active-set described here for a completely linearizable system. However, the durable-index and active-set approach can be applied to weaker-consistency leader-based replication, and in such cases, this can improve consistency, hopefully at a low-enough cost. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Conflict-free Replicated Data Types

We kicked off a new set of papers in the reading group with some fundamental reading – “Conflict-free Replicated Data Types.” Although not very old (and not the first one to suggest something similar to CRDTs), the paper we discussed presents a proper definition of Conflict-free Replicated Data Types (CRDTs) and the consistency framework around them. Needless to say, lots of research followed after this paper in the area of CRDTs. 

It is impossible to discuss Conflict-free Replicated Data Types without mentioning the consistency in distributed replicated systems a bit. In super high-level terms, consistency describes how well a replicated system mimics a single copy illusion of data. On one side of the consistency spectrum, we have strong consistency (i.e. linearizability), that appears to an outside observer as if there is exactly one copy of the data. On the other side of the spectrum, we have eventual consistency, which allows many kinds of data artifacts, such as reading uncommitted data, accessing stale data, and more. 

Strong consistency, however, comes at a significant performance cost that eventual systems do not have. For strong consistency, the order of operations is crucial, as clients must observe a single history of state changes in the system. Most often this means that all replicas must apply the same sequence of commands in the same order to progress through the same states of their state machines. This requires a sequencer node which often is a bottleneck. There are a few exceptions to this, for example, protocols like ABD do not need to have a single sequencer, and there may be even gaps/variations in history on individual nodes, but these protocols have other severe limitations. In addition to following the same history of operation, linearizability also imposes strict recency requirements — clients must observe the most recent state of the system. This prescribes synchronous replication to make sure enough nodes progress in lock-step for fault tolerance reasons. These challenges limit scalability — despite having multiple replicas, a replicated linearizable system will be slower than a single server it tries to mimic. 

Eventually-consistent systems do not have such strong performance constraints, because there is no need to order operations, enforce recency, and even keep a single history of updates. This gives a lot of freedom to explore parallelism and push the boundaries of performance. Unfortunately, these systems are hard to program against, since the application built on top of an eventual consistency store needs to account/anticipate all kinds of data artifacts and deal with them. 

All these differences between strong and eventual consistency also mean that they land on different sides (vertices?) of the CAP triangle. With the recency requirements and lock-step execution, linearizable systems are CP, meaning that they sacrifice the Availability in the face of network Partitions and remain Consistent. Eventual systems… well, they do not promise Consistency at all, so they remain Available.  

Anyway, the drastic differences between the two extremes of the consistency spectrum coupled with the scary CAP theorem have sparked a lot of research in consistency models that lie between strong and eventual. These intermediate models were supposed to provide a compromise between the safety of strongly consistent systems and the performance/availability of eventual ones. This is where CRDTs come to play, as they often drive the Strong Eventual Consistency (SEC) model. The paper presents SEC as the “solution to CAP”, and this makes me cringe a bit. First of all, Strong Eventual Consistency is a strongly confusing name. Secondly, having a solution to CAP sounds super definitive, whereas SEC is merely one of many compromises developed over the years. 

Now we are getting to the meat of the paper that excites me. See, aside from a cringy name and a claim to solve CAP problems, SEC is pretty clever. A big problem with eventual consistency is that it does not define any convergence rules. Without such rules, the system may converge to an arbitrary state. Moreover, the convergence itself becomes unpredictable and impossible to reason about. SEC addresses the convergence problem by imposing some rules to the eventual consistency model. This enables engineers to reason about both the intermediate and final states of the system.

More specifically, SEC calls that any two identical nodes applying the same set of operations will arrive at identical states. Recall, that this sounds similar to how strongly consistent systems apply operations at nodes. The difference is that in strongly consistent systems we reason about sequences that have some order to them. In SEC, we work with sets of commands, which are order-less. I think this is a pretty cool thing, to throw away the order, yet still, ensure that the convergence is predictable and dependable on operations we have.

Completely throwing away the operation ordering and working with operation sets instead of sequences is tricky though. Consider some variable x, initially at x:=2. If we have two operations: (1) x:=x+2 and (2) x:=x*2, we can clearly see the difference if these operations are applied in a different order — by doing the operation (2) first, we will get a final state of 6 instead of 8. This presents a convergence problem and a violation of SEC if different nodes apply these operations in a different order. In a sense, these two operations, if issued concurrently, conflict with each other and require ordering. So clearly we need to be smart to avoid such conflicts and make SEC work. 

There is no generic solution to avoid such conflicts, but we can design specific data structures, known as Conflict-free Replicated Data Types or CRDTs solve this ordering problem for some use cases. As the name suggests, CRDTs are built to avoid the conflicts between different updates or different versions of the same data object. In a sense, CRDTs provide a data structure for a specific use case with some defined and restricted set of operations. For instance, we can have a CRDT to implement a distributed counter that can only increment the counter’s value or a CRDT for an add-only set. The paper presents two broad types of CRDTs — state-based and operation-based CRDTs. Both types are meant for replicated systems and differ in terms of communicating the updates between nodes and reconstructing the final state. 

State-based CRDTs transfer the entire state of the object between nodes, so they can be a bit heavier on bandwidth usage. The actual state of a data structure is not directly visible/accessible to the user, as this state may be different than the logical meaning of the data structure. For example, going back to the counter CRDT, logically we have a single counter, but we may need to represent its value as consisting of multiple components in order to ensure conflict-free operation. Assume we have n nodes, and so to design a state-based counter CRDT we break down the counter value to registers <c1, c2, c3,…, cn>, each representing the increments recorded at a particular node. The logical state of CRDT counter is the sum of all registers \(c=\sum_{i=1}^nc_i\), which must be exposed through a query function. In addition to the query function, there must be an update function to properly change the underlying state. For the counter, the update function will increment the register corresponding to its node id. 

The most important part, however, is still missing. If some node receives concurrent increments, how can it reconcile them? Let’s say we have 3 nodes {n1, n2, n3}, each starting in some initial counter state <4,5,2>. These nodes receive some updates and increment their respective registers locally: n1:<6,5,2>, n2:<4,6,2>, n3:<4,5,4>. The nodes then send out their now divergent copies of the counter CRDT to each other. Let’s say node n2 received an update from n3, and now it needs to merge two versions of CRDT together. It does so with the help of the merge function, which merges the two copies and essentially enforces the convergence rules of SEC. There are some specific requirements regarding the merge function, but they essentially boil down to making sure that the order in which any two CRDTs merge does not matter at all. In the case of our counter, the merge function can be as simple as a pairwise comparison of registers between two versions of CRDT and picking the maximum value for each register. So for merge(n2:<4,6,2>, n3:<4,5,4>), we will see the updated value of <4,6,4> on node n2. If at this point n2 sends its update to n1, then n1 will have to do merge(n1:<6,5,2>,n2:<4,6,4>), and get the final version of <6,6,4>. Note that if n1 now also receives n3‘s update, it will not change the state of n1, since that update was already learned indirectly. This scheme works pretty neatly. It tolerates duplicate messages and receiving stale updates. However, we can also see some problems — we carry a lot more state than just a simple integer to represent the counter, and our counter’s merge function has restricted the counter to only allow increments. If we try to decrement a value at some node, it will be ignored, since the merge function selects the max value of a register. The latter problem can be fixed, but this will require essentially doubling the number of registers we keep for each node, exacerbating the state-size problem. 

Example of state-based counter with some sample message exchange.

Operation-based CRDTs somewhat solve the above problems. Instead of transferring the full state of the object, op-based CRDTs move around the operations required to transform from one state to the next. This can be very economical, as operations may use significantly less bandwidth or space than the full CRDT state. For instance, in our counter CRDT example, the operation may be the addition of a number to the counter. Of course, as the operation may propagate at different speeds, and potentially get reordered, op-based CRDT requires that all concurrent operations are commutative. In other words, we again set the rules to ensure that the order of updates (i.e. operations) does not matter. In the counter use-case, we know that all additions commute, making it easy to implement an op-based counter. Unlike the state-based version, we do not even need to have multiple registers and sum them up to get the actual value of the counter. However, there are some important caveats with op-based CRDTs. They are susceptible to problems when a message or operation gets duplicated or resent multiple times. This creates a significant challenge, as either the operations themselves must be designed to be idempotent, or the operation delivery layer (communication component of the application) must be able to detect duplicates and remove them, essentially ensuring idempotence as well.

The paper goes into more details and more examples of each type of CRDT, as well as explaining how the two types are roughly equivalent in terms of their expressivity. Intuitively, one can think of the merge function as calculating a diff between two CRDT versions and applying it to one of the versions. Operations are like these diffs to start with, so it makes sense how the two types can be brought together. We have our presentation of the paper available here:


1) Challenges designing CRDTs. As mentioned in the summary, CRDTs are special-purpose data structures, so designing them to fit a use case takes some time. I have spent some time a few years ago working on CRDTs at Cosmos DB, and it was a very fun thing to do, but also a bit challenging. A good example of the problem is a set CRDT. It is easy to make an add-only set, where items can be added and not removed. All set additions commute, so the problem is trivial. But to make sets more practically, we want to remove items too. A simple solution is to internally implement a removed set, so CRDT tracks all items added and removed separately. This way we can hardcode the precedence of adds and removes and say removes always come after ads for an item. But this works only as long as we do not ever need to re-add items back into the set…

2) Modeling. Due to their concurrency nature, it is a good idea to model and model-check CRDTs. I used TLA+ for this purpose. During the discussion, a question was raised on the best tools for CRDT model-checking, but unfortunately, nobody knew anything better than TLA+/TLC. I’d suspect that other tools used for verifying distributed systems, such as Alloy, could work as well.

3) Applications. Quite a bit of discussion was focused on applications that use CRDTs. we talked quite a bit about near-real-time collaborative tools, such as collaborative document editing. I mentioned the Google Docs style of application quite a few times, but it was brought up that Google actually uses Operational Transformation (OT) instead of CRDT. In particular, server-based OT, which requires a server to sync each client against. Regardless, collaborative tools seem to be the prime field for CRDTs. For instance, the Automerge library provides a good start for JSON-like CRDT to serve as the basis for these types of applications

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Exploiting Symbolic Execution to Accelerate Deterministic Databases

We have covered 60 papers in our reading group so far! The 60th paper we explored was “Exploiting Symbolic Execution to Accelerate Deterministic Databases” from ICDCS’20. I enjoyed the paper quite a lot, even though there are some claims I do not necessarily agree with.

The paper solves the problem of executing transactions in deterministic databases. We can image a replicated state machine, backed by Paxos or Raft. Trivially, in such a machine, each replica node needs to execute the transactions following the exact order prescribed by the leader to guarantee that all replicas progress through the same states of the machine. The good thing here is that nodes run each transaction independently of each other after the execution order has been established by the replication leader. The bad thing is that this naive approach is sequential, so each node cannot take advantage of multiple processing cores it may have. 

Naturally, we want to parallelize the transaction execution. This, however, is easier said than done. To allow for more parallelism, we want to identify the situations when it is ok to run some transactions concurrently without impacting the final state of the state machine. For example, if we know what objects or keys the transaction reads and writes (i.e. the transaction’s read-and-write set), we can group independent transactions that operate on disjoint read-write sets together for parallel execution. For instance, a transaction accessing keys “x, y, and z” is independent of a transaction accessing keys “a, b, and c,” and the two can execute at the same time without impacting each other. 

Of course, this requires us to know what objects/keys each transaction needs before running them, and this is a bit of a problem. In some situations, it may be easy to figure out the read and write sets of a transaction, but this is not always the case. Many systems, like Calvin, do a “pretend run” of a transaction (this is sometimes referred to as a reconnaissance transaction) to figure out the read-and-write set if the set isn’t obvious or annotated in the transaction. This has a few obvious downsides. Obviously, the pretend/reconnaissance phase uses the system’s resources. The reconnaissance run also increases the transaction’s latency. And finally, the reconnaissance is not perfect, and by the time of the “real” run, the read and write sets may have changed due to other transactions impacting the state of the system.  

So, the above description is somewhat generic behavior for many systems out there. And this is where Prognosticator, a system discussed in the paper, comes in. Prognosticator uses Symbolic Execution (SE) to profile transactions and help predict each transaction’s read-and-write set. The system does not need experts to annotate transactions read-and-write sets, but it can still avoid the reconnaissance runs in many situations. Sometimes, however, the reconnaissance must still happen, but Prognosticator uses a few tricks to reduce the possibility of the reconnaissance becoming stale.

Let’s look at the issues with figuring out the read-write set of a transaction. Many transactions are not primitive read and write commands, and involve quite a bit of logic with loops and conditional statements. This means that a state of a client/application may impact both the values written and the keys accessed. For example, consider a transaction that takes some input i:

input i;
if i > 10 then write x:=i;
          else write y:=i;

The read-and-write set of above example depends on the input value already known to the client. The paper calls this transaction an Independent Transaction (IT), as it does not have an internal dependence on the read values. 

Some transactions can be a bit more complicated and have the read-write set depending on the value read by the transaction:

read a;
if a > 10 then write x:=a;
          else write y:=a;

Here the transaction does not know its write set (i.e. writing x or y) until it acquires the value of a. Prognosticator paper refers to these transactions as Dependent Transactions (DT), as the write set has an internal dependence on the read values. 

Obviously, for both types of transactions, we can do the reconnaissance phase to figure out all the logic and branching to learn all the required keys. But we do not really need full reconnaissance for the ITs, as their read-and-write sets only depend on some client input and not the transaction itself. In fact, we can just play out the transaction’s code to figure out the read-write set without actually retrieving any data from the store (i.e. using some dummy values). However, we still somehow need to know whether a transaction is IT, as using dummy values in DT will clearly not work. Moreover, such a “dry run” with dummy values for every IT we encounter is still wasteful, as we do it every time.

Example of Symbolic Execution (SE) – symbolic solution α, path constraint φ.

This is where Prognosticator’s Symbolic Execution (SE) approach shines. With SE, Prognosticator “unwraps” each transaction for all the possible execution branches, leading to a symbolic transaction solution for all possible code paths. If all code paths access the same keys then we have a static read-and-write set. If certain execution branches access different keys, but branching conditions involve only transaction input, then we are dealing with an Independent Transaction (IT). We can easily compute IT’s read-and-write set from the symbolic solution once the input is known. Finally, if SE yields some branches with different access keys, and these branches are conditioned on a transaction’s reads, then we have a Dependent Transaction (DT) and will require a reconnaissance read. 

The Prognosticator “unwraps” each new transaction type only once at the client-side to create such a symbolic execution profile with all possible code branches. There are quite a few optimizations mentioned in the paper. The important gist of these optimizations is the fact that we do not care so much about the actual symbolic solutions, and care only about what keys show up in the read set and write sets. So if two execution branches produce different symbolic solutions, but access the same keys, these branches can be “merged” for the purposes of predicting the read-and-write set. 

The rest of the Prognosticator’s magic depends on a batching technique that allows for a careful deterministic reordering of transactions. With the help of SE, the system identifies all read-only transactions (ROTs) and executes them concurrently at the beginning of the batch. This leaves us with a batch containing only ITs and DTs. The system then reorders DTs to the beginning of the batch. This allows it to run reconnaissance reads on all DTs while also working on ROTs. Since all DTs are now at the beginning of the batch, the reconnaissance reads cannot become stale due to any IT. Reconnaissance may still become stale due to the dependencies between DTs, and in this case, a DT is aborted during the execution phase and is placed in the abort batch to run after the main batch completes and before the next batch.

Lock table with per-key queues on the right. ROTs are not in the table, DTs and ITs are ordered in the table. Only transactions with all keys at the head of the queues can execute.

To execute the ITs and DTs in parallel, Prognosticator uses a lock table. The lock table is a collection of per-key queues, such that for every key in the batch there is an entry in the table with a queue of transactions. A transaction can be executed when it is at the head of the queues for all its keys. Obviously, executing a transaction removes it from all these queues.

The whole transaction execution process runs independently on each node. It is safe because we actually do not need to stick to the leader-prescribed ordered in the batch, as long as we keep the correct order of batches, and deterministically reorder the transactions in the same way on all replica nodes. With the batch execution, we have all the clients waiting for their transactions to finish, and this deterministic reordering does not cause any problems as all waiting transactions are concurrent. This is a common trick used in many systems. 

The whole package with SE and batching provides a significant boost to the throughput compared to Calvin. It is not entirely clear though how much of the boost was enabled by the SE, but I will come back to this point in the discussion summary. 

The paper goes into more detail on many aspects of the paper, including symbolic execution, more efficient implementation of the lock table, resource usage, etc. It was definitely a good read for me. As always, we had a presentation in our reading group, and I had to cover for a missing presenter:


1) Performance gains due to SE vs Batching. The paper claims a great speedup compared to Calvin, however, one question we had is just how much improvement is due to symbolic execution and how much of it is because of clever batching techniques. Let me elaborate. Some benefit comes from the careful reordering of operations. Running ROTs first helps a lot. Running reconnaissance reads at the node (compared to running a reconnaissance transaction at the client) is a lot faster too, and it reduces the possibility of reconnaissance becoming stale. Some of these techniques may be applicable in simpler systems without SE. For example, if we have transactions with annotated read-write sets, these reorderings within the batch become possible. Of course, SE definitely helps find ROTs and separate ITs from DTs to improve/enable the reordering within the batch without the need for annotated transactions. 

2) Slow ROTs. Deterministic transactions are susceptible to clogged pipelines when some big long-running transaction gets in the way and delays consecutive transactions. Parallel/concurrent execution helps here by essentially having multiple processing pipelines. However, Prognosticator has one issue with the read-only transactions (ROTs), making them more susceptible to the clogging problem. The paper mentions that ROTs get executed at the beginning of the batch from a stable snapshot. All workers must complete their ROTs before the system moves to DTs and ITs to make sure that no DT or IT changes that stable snapshot while ROTs are still running. This means that there is a barrier at the end of the ROT phase, allowing a single long-running ROT to screw up the performance by delaying all DTs and ITs in the batch. However, this may be just an artifact of an academic prototype — taking a separate snapshot and running all ROTs from that snapshots should allow ROTS to execute concurrently with writes. 

3) Cost of SE. The paper mentions the cost of doing symbolic execution. There are a few problems here. The first is processing time – more complicated transactions need a lot of time to pretty much exhaustively explore all branching. This also requires putting a limit on the number of allowed loop iterations. The limit can create a situation where a transaction is not fully profiled if it actually goes above the limit, requiring a reconnaissance. The limit, being a configurable parameter, may also require some expert knowledge of the workload to properly configure, and this is something the paper strives to avoid. Another big cost is the memory footprint of transaction profiles. The authors mention that the TPC-C benchmark required 960 MB for transaction profiles, which is not a small cost for a simple benchmark with relatively few transaction types. In the real world, the memory cost of having transaction profiles may be much higher.

4) Extension to sharded systems? Prognosticator works in the replicated system with all nodes storing identical data. It does not work in a sharded environment, yet most large-scale databases are sharded. It may be non-trivial to apply the same approach to sharded systems. After all, running a distributed transaction is harder, and may require some coordination between the shards. At the same time, it may still be possible to separate transactions according to their types and conflict domains with the help of SE to increase parallelism and make transaction execution more independent. Again, real systems based on Calvin’s ideas have cross-shard transactions. A big problem with sharded setup in Prognosticator involves DTs — the system expects to perform the reconnaissance read locally, which means that all data for the transaction must be available at each node. This is not possible in the sharded environment. And making reads non-local will make the system much closer to Calvin with a longer distributed reconnaissance phase and negative performance impact. So, the non-sharded nature of Prognosticator is a huge performance benefit when comparing with more general Calvin.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Multitenancy for Fast and Programmable Networks in the Cloud.

We discussed “Multitenancy for Fast and Programmable Networks in the Cloud” in the 59th DistSys Reading Group meeting. In a sense, this was a continuation of a previous discussion we had a few months ago when covering Pegasus paper. 

Pegasus and many other new protocols rely on specialized programmable network hardware that is not yet available to regular cloud users. This is because such hardware has no support for multitenancy and cannot be efficiently shared and virtualized to allows cloud users to get a virtual slice of the hardware pie. This paper discusses enabling multitenancy on today’s smart network hardware, such as smart switches. 

It is not an easy feat to pull off — these switches (in the paper, the authors used a switch based on Barefoot Tofino logic) have limited programmability, limited resources, and can run just one program. Of course, if you can have only one running program, it is hard to run code from different users/tenants. The paper works around this by compiling the code from all tenants together into one “jumbo-program” that can run on the switch. This tenant packing also solves an issue with resource sharing and isolation as the switch’s hardware resources must be defined at the compile time. Of course, there are also issues isolating tenant’s code. This is done at compile time too by renaming the tenant program’s fields, such as tenant-defined headers and table names to the tenant-unique names. A few restrictions are placed on tenants for the sake of isolation, and the paper goes into more detail. One important assumption is that the tenants are expected to be isolated based on their virtual networks (VLANs).

In addition to being the glue that holds all the tenants together, the “jumbo-program” incorporates some “system” code to provide common functionality. For instance, the “system” program can process common packet headers. This allows it to read VLAN ID (ID) and then invoke proper tenant so that the tenant code can do its own header matching/parsing.

The “system” program of a “jumbo-program” also helps us deal with one tricky part here. See, the program on a switch needs to run at the network line speed, and cannot delay any packets, so there is no way to delay any processing. This means that the program runs as a pipeline in multiple stages with no way of returning to a previous one. The “system” program “sandwiches” the tenant program and in a sense establishes some rules and bounds for the tenants.

I already mentioned that the resources are defined/allocated ahead of time. This is also true for memory, as programmers must hardcode the array sizes of stateful memory they want to use. However, the paper works around this by kind of building a virtual memory in the system program of the “jumbo-program”. The “jumbo-program” just allocates a huge array of memory in each pipeline stage and uses a configuration stage to store the memory boundaries for each tenant. Tenants use the boundaries as index offsets into the big array, essentially implementing virtual memory within the “jumbo-program”, and allowing for dynamic memory reallocation by adjusting these memory offsets.

The paper has its HotCloud presentation. And as usual, we had our own presentation, this time by David Correa:


1) Other approaches? The paper mentions some alternatives to the switches they have used. For example, software switches are a lot easier to program. FPGAs can be used for handling some network functions as well. But, according to the paper, these are all slower. ASICs, like Barefoot (now owned by Intel) Tofino, are a lot faster, but with such speed, they are more limited. The programmability constraints, the pipeline processing nature, memory limitations (22 MB of SRAM), etc. are big limiting factors though.   

2) Hardware-specific solution. So the solution presented here is rather specific for the hardware available now. This is cool, but I’d want to imagine what can be done in the near future. Can we improve the hardware to reduce these limitations? The authors conjecture that the pipelines will stay: “we expect ASIC-based pipelines—both for switches and NICs—to be more effective than other programmable architectures in terms of both cost and power consumption.” However, maybe somehow better support for multi-tenancy can be backed to a new generation of devices on the hardware level, along with greater flexibility of adding/removing tenants.

3) Not quite cloud-ready? The major pain point of the solution seems to be the need to “mush together” all the tenants before compiling the program and deploying it on the switch. If tenants change (or tenant’s requirements for some hardware resources) the whole thing needs to be repackaged and re-deployed. This sounds like a cumbersome process that may result in some downtime while the new “jumbo-program” is deployed on the network. Exposing this process as some kind of cloud service seems quite infeasible due to these issues. It would have been nice if more resources could have been reallocated between tenants dynamically (of course, we are not experts here, but there may be some real hardware/performance limitation for this very static approach). It would also be nice to have the ability to “side-load” new tenant programs into the “jumbo-program” runtime. 

4) Incentives for the cloud providers? Smart network devices can greatly benefit those who have access to them. We are starting to see many systems relying on these. However, right now, cloud providers are those who have access to such hardware, and it may provide some advantage to these providers and their native services. As such, there may be little incentive to offer these resources in the cloud. That being said, this may also play the other way around. If this becomes a very desired feature, the first cloud provider to offer these virtualized multi-tenant switches stand to benefit, and other providers will follow in the chain reaction of chasing the market. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Paper List. Papers ##61-70

We have gone through our current list of papers, covering 10 interesting projects over the past 10 weeks. Now it is time to move on with a new set of papers that will carry the reading group all the way through the summer term.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Cerebro: A Layered Data Platform for Scalable Deep Learning

In the 58th reading group session, we covered “Cerebro: A Layered Data Platform for Scalable Deep Learning.” This was a short meeting, as our original presenter, unfortunately, had some other important commitments. There are actually two Cerebro papers, one appeared at CIDR and another one at VLDB’20. The main premise behind the system is that ML models are rarely trained in isolation. Instead, we often train multiple models concurrently, as we try to find optimal parameters for the best results. This hyper-parameter tuning is costly as it relies on an exhaustive search for the best parameter configuration. Also, as the authors claim, most systems do not take this multi-model/config-same-data parallelism into account. 

A trivial way to deal with running multiple jobs on the same data is to simply run multiple models/configurations as if they are totally independent. Of course, this can lead to data amplification, as we may need to copy the same data to different workers or even systems. Or we can run these models one at a time — as one model/parameter configuration is done, the next one can start. Cerebro proposes a different approach that allows models to run concurrently, avoids data amplification, and reduces communication while preserving good model convergence. The approach shards the data to workers and allows different models to start on these different shards at the same time. Once a worker-full of data has been processed, the entire model (all the weights, etc) can migrate to a worker with another shard. The paper claims that this reduces the data-transfer demand in the cluster compared to approaches the must continuously update model weights (i.e. in the parameter servers)

I am not an ML expert, and my improvised presentation was rather bad, so instead, I will link to a YouTube presentation by the authors, which explains the whole thing much better than I can do. 


1) Still a money problem. Cerebro aims to improve the efficiency of hyper-parameter tuning, but it still seems like a money problem, as we need to throw more resources to exhaustively find the best model. Are there better approaches than the exhaustive search like that? Some heuristics? Or past experience on similar problems/data to take into account?

2) Pruning. The paper(s) seem to train all models/configurations all the way to the end. Again, this is probably wasteful — if we see that some model/configuration does not converge fast enough, it makes sense to kill it early. The system seems to support this and is built around this idea as well, but it is not as explicit in the paper. In Cerebro, scheduling is done on the per-epoch level, so there is a chance to evaluate each model/parameter-config after each training epoch and adjust. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!