Reading Group. UniStore: A fault-tolerant marriage of causal and strong consistency

For the 80th paper in the reading group, we picked “UniStore: A fault-tolerant marriage of causal and strong consistency” by Manuel Bravo, Alexey Gotsman, Borja de Régil, and Hengfeng Wei. This ATC’21 paper adapts the Partial Order-Restrictions consistency (PoR) into a transactional model. UniStore uses PoR to reduce coordination efforts and execute as many transactions as possible under the causal consistency model while resorting to strong consistency in cases that require ordering concurrent conflicting transactions. The PoR consistency itself is an extension of RedBlue consistency that allows mixing eventually consistent and strongly consistent operations. 

UniStore operates in a geo-replicated model, where each region/data center (I use region and data center interchangeably in this post) stores the entirety of the database. The regions are complete replicas of each other. Naturally, requiring strong consistency is expensive due to cross-region synchronization. Instead, UniStore allows the developers to choose running transactions as causally or strongly consistent. Causal consistency preserves the cause-and-effect notion between events — if some event e1 has resulted in event e2, then an observer seeing e2 must see the e1 as well. Naturally, if two events are concurrent, then they are not causally dependent. This independence gives us the freedom to apply these events (i.e., execute against the store) in any order. For instance, this approach provides good performance for events that may touch disjoint data concurrently. However, if these two events are concurrent and operate on the same data, then the events either need to be commutative or be partly ordered. UniStore does both of these — it implements CRDTs to ensure commutativity, but it also allows to declare a transaction as strongly consistent for cases that require ordering. As such, strong consistency becomes handy when the commutativity alone is not sufficient for the safety of the application. For example, when a transaction does a compare-and-set operation, the system must ensure that all replicas execute these compare-and-set operations in the same order.

So, in short, causal consistency is great when an application can execute complex logic in causal steps — event e1 completes, then observing the results of e1 can cause some other change e2, and so on. Strong consistency comes in handy when step-by-step non-atomic logic is not an option, and there is a need to ensure the execution order of conflicting concurrent operations. In both cases, the systems need to keep track of dependencies. For causal transactions, the dependencies are other transactions that have already finished and were made visible. For strongly consistent transactions, the dependencies also include other concurrent, conflicting, strongly consistent transactions.  

So, how does UniStore work? I actually do not want to get into the details too deep. It is a complicated paper (maybe unnecessarily complicated!), and I am not that smart to understand all of it. But I will try to get the gist of it. 

The system more-or-less runs a two-phase commit protocol with optimistic concurrency control. Causal transactions commit in the local data center before returning to the client. However, these causal transactions are not visible to other transactions (and hence other clients/users) just yet. Remember, this causal transaction has not been replicated to other data centers yet, and a single region failure can cause some problems. In fact, if a strongly consistent transaction somehow takes such a non-fully replicated causal transaction as a dependency, then the whole system can get stuck if the dependent causal transaction gets lost due to some minority regions failing. 

UniStore avoids these issues by making sure the causal transactions are replicated to enough regions before these causal transactions are made visible. This replication happens asynchronously in the background, sparing the cost of synchronization for non-strongly consistent transactions (that is, of course, if clients/users are ok with a remote possibility of losing transactions they thought were committed).

Strongly consistent transactions are a different beast. They still optimistically run in their local data centers but no longer commit in one region to ensure the ordering between other strongly consistent transactions. UniStore uses a two-phase commit here as well, but this time the commitment protocol goes across all healthy regions. First, the coordinator waits for enough data centers to be sufficiently up-to-date. This waiting is crucial for liveness; it ensures that no dependent transaction may get forgotten in the case of data center outages. After the waiting, the actual two-phase commit begins, with all (alive?) regions certifying the transaction. 

To implement this waiting and only expose the durable and geo-replicated state to transactions, UniStore has a complicated system of version tracking using a bunch of vector clocks and version vectors. Each of these vectors has a time component for each data center and an additional “strong” counter for keeping track of strongly consistent transactions. Each transaction has a couple of important version vectors. 

The snapshot vector snapVec describes the consistent snapshot against which the transaction runs. The commit vector commitVec tells the commit version of a transaction used for ordering. 

Each replica keeps two different version vectors representing the version of the most recent transaction known to itself and its data center. Since the system relies on FIFO order of communication and message handling, knowing the version of the most recent transaction implies the knowledge of all lower-versioned transactions as well. This information is then exchanged between data centers to compute yet another version vector to represent the latest transaction replicated to at least some majority of data centers. This, again, implies that all lower-versioned transactions have been replicated as well. This last version vector allows strongly consistent transactions to wait for their dependencies to become globally durable to ensure liveness. 

So here is where I lose understanding of the paper, so read on with a pinch of salt, as my skepticism may be completely unwarranted. It makes sense to me to use version vectors to keep track of progress and order causal transactions. Each region computes the region’s known progress, exchanges it with other regions, and calculates the global “transaction frontier” — all transactions that have been replicated to a sufficient number of data centers. This exchange of known progress between regions happens asynchronously. I am not entirely sure how these progress vectors help with ordering the conflicting transactions. Somehow the “strong” counter should help, but this counter seems to be based on the regions’ knowledge of progress and not the global one. I suspect that these vectors help identify the concurrent conflicting transactions. The progress known in the data center ends up in a snapVec and represents the snapshot on which the transaction operates. The strongly consistent transactions use a certification procedure (i.e., a two-phase commit) to decide whether to abort or commit. The paper mentions that the certification process assigns the commitVec, which actually prescribes the order. At this point, I hope that conflicting transactions are caught in this Paxos-based transaction certification procedure and ordered there or at least aborted as needed. Also worth mentioning that the extended technical report may have more details that I was lazy to follow through.

Now a few words about the evaluation. The authors focus on comparing UniStore against both casual and strongly consistent data stores. Naturally, it sits somewhere in the middle of these two extremes. My bigger concern with their implementation is how well it scales with the number of partitions and number of data centers. The paper provides both of these evaluations, but not nearly to the convincing scale. They go up to 5 data centers and up to 64 partitions. See, with all the vectors and tables of vectors whose size depends on the number of regions, UniStore may have some issues growing to “cloud-scale,” so it would be nice to see how it does at 10 data centers or even 20. Cloud vendors have many regions and even more data centers; Azure, for example, has 60+ regions with multiple availability zones.

Our groups persentation by Rohan Puri is on YouTube:


1) Novelty. So the paper works with a rather interesting consistency model that combines weaker consistency with strong on a declarative per-operation basis. This model, of course, is not new, and the paper describes and even compares against some predecessors. So we had the question about the novelty of UniStore since it is not the first one to do this kind of consistency mix-and-match. It appears that the transactional nature of UniStore is what separates it from other such solutions. In fact, the bulk of the paper talks about the transactions and ensuring liveness in the face of data center outages, so this is nice. Many real-world databases are transactional, and having a system like this is a step closer to a practical solution.  

2) Liveness in the presence of data center failures. Quite a lot of the paper’s motivation goes around the inability to simply run OCC + 2PC in the PoR consistency model and maintain liveness. One problem occurs when a causal transaction takes a dependency on another transaction that did not make it to the majority of regions. If such a transaction is lost, it may stall the system. Of course, any region that takes a dependency on some transaction must see it first. Anyway, it is hard to see the novelty in “transaction forwarding” when pretty much any system recovers the partly replicated data by “forwarding” it from the nodes that have it. 

However, the bigger motivational issue is with strongly consistent transactions. See, the authors say that the system may lose liveness when a strongly consistent transaction commits with a dependency on a causal one that has not been sufficiently replicated, and that causal transaction gets lost in a void due to a region failure. However, to me, this seems paradoxical — how can all (healthy) regions accept a transaction without having all the dependencies first? It seems like a proper implementation of the commit protocol will abort when some parties cannot process the transaction due to the lack of dependencies. Anyway, this whole liveness thing is not real and appears to be just a way to make the problem look more serious. 

That said, I do think there is a major problem to be solved. Doing this more proper commit protocol may hurt the performance by either having a higher abort rate or replicating dependencies along with the strongly consistent transaction. We’d like to see how much better UniStore is compared to the simpler 2PC-based solution that actually aborts transactions when it cannot run them. 

3) Evaluation. The evaluation largely compares against itself, but in different modes — causal and strong. This is ok, but how about some other competition? Take a handful of other transactional approaches and see what happens? The current evaluation tells us that the PoR model provides better performance than strong and worse than causal consistency. But this was known before. Now we also know that this same behavior translates to a transactional world. But we do not know how the cost of the protocol fares against other transactional systems that do not have PoR and are not based on UniStore with features disabled.

Also interesting to see is how expressive the transactional PoR consistency model is. For example, let’s take MongoDB. It can be strongly consistent within a partition and causal across the partitions (and within the partition, users can manipulate the read and write consistency on a per-operation basis). What kind of applications can we have with Mongo’s simpler model? And what kind of apps need UniStore’s model with on-demand strong consistency across the partitions?

4) Complicated solution??? I have already mentioned this in the summary, but UniStore is complicated and relies on many moving parts. The paper completely omits the within-datacenter replication for “simplicity,” but it does not really make the paper simple. We have vectors that track progress, order, and snapshots, and then we have tables of vectors, and all these multiple kinds of vectors are exchanged back-and-forth to compute more vectors only to find out when it is ok to make some transactions visible or unblock some strongly consistent transactions. How come other systems (MongoDB again?) implement causal consistency with just one number for versioning (HLC) and still allow to specify stronger guarantees when needed? Yeah, they may not implement the PoR consistency model but it just seems too complicated. As a side question… what happens in Mongo when we start changing consistency between causal and strong on a per-request basis?

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Scaling Large Production Clusters with Partitioned Synchronization

Our 79th paper was “Scaling Large Production Clusters with Partitioned Synchronization.” ATC’21 paper by Yihui Feng, Zhi Liu, Yunjian Zhao, Tatiana Jin, Yidi Wu, Yang Zhang, James Cheng, Chao Li, Tao Guan. This time around, I will not summarize the paper much since A. Jesse Jiryu Davis, who presented the paper, has written a very good summary

The core idea of this paper is to scale the task schedulers in large clusters. Things are simple when one server performs all the scheduling and maps the tasks to servers. This server always knows the current state of the cluster and the load on each worker. The problem, of course, is that this approach does not scale, as a single scheduling server is an obvious bottleneck. A natural solution is to increase the number of schedulers, but this introduces new problems — to gain benefit from parallel schedulers, these schedulers cannot synchronize every single operation they perform. So, schedulers inherently operate on stale (and different) views of the cluster and, for example, may accidentally assign tasks to workers that are at capacity. The longer the schedulers work independently without synchronizing, the more divergent their cluster views become, and the greater number of mistaken or conflicting assignments they perform. The conflicts due to stale cluster views (i.e., two schedulers assigning two different tasks for the same worker slots) mean that at least one of these assignments has to retry for another worker/slot. As a result, we must have some balance between staleness and synchrony.

Typically, the schedulers would synchronize/learn the entire cluster view in a “one-shot” approach. A master scheduler aggregates the cluster views of all the schedulers and distributes the full aggregated cluster view back. This full “one-shot” synchronization is an expensive process, especially for large clusters, and it again puts the master into a bottleneck position. So, the system can perform this sync only so often. The paper proposes a partitioned synchronization or ParSync, as opposed to “one-shot” full synchronization. The partitioned approach takes the full cluster view and breaks it down into chunks, synchronizing one chunk at a time in a round-robin manner. The complete sync cycle occurs when the synchronization has rotated through all chunks or partitions. Both one-shot and ParSync approaches provide the same average staleness if both operate on the same full sync interval G. The difference is that the “one-shot” approach synchronizes every G time-units, while ParSync updates some partition every G/N time-units, for a system with N partitions. The partitioned synchronization has the effect of bringing the variance of staleness down. From the author’s experiments, they conjecture that most scheduling conflicts occur when staleness is high. As a result, bringing down the maximum staleness in the system brings the most benefit. The paper provides a detailed explanation of these findings, while Jesse, in his blog and presentation, gives an excellent visual example of this. 

Another thing that I oversimplified is scheduling itself. See, it is not enough to know that some worker server has some capacity to be used for some task. In the real world, the workers have different speeds or other properties that make some workers preferred to others. The paper tries to capture this worker’s non-uniformity through the quality metric and suggests that a good scheduling mechanism needs to find not just any worker fast but a good quality worker fast. Moreover, the contention for quality workers is one source of scheduling delays. 

The paper provides a much greater discussion on both the ParSync approach, and other scheduler architectures, so it is worth checking out,


1) Quality. A good chunk of our discussion centered around the quality metric used in the paper to schedule tasks. ParSync has an approach to prioritize quality scheduling when staleness is OK and reduce quality when it becomes too expensive to ensure quality due to conflicts for high-quality slots. The paper treats the quality as a single uniform score common to all tasks scheduled in the system. This uniform quality score approach is not very realistic in large systems. Different tasks may have varying requirements for hardware, locality to other tasks, and locality to caches among many other things. 

Having quality as a function of a task complicates things a bit from the simplified model. For instance, if a particular set of tasks prefers some set of worker nodes, and all these workers belong to the same partition, then, effectively, we degraded the scheduling to a “one-shot” synchronization approach for that specific set of tasks. So partitioning becomes a more complicated problem, and we need to ensure that each partition has a uniform distribution of quality workers for each set of similar tasks. 

2) Number of Partitions. Another issue we discussed deals with the number of partitions. The main paper does not explore the impact of the number of partitions on performance and scheduling quality. The paper, however, mentions that the results are available in the extended report. In summary, the number of partitions has little effect on performance. This is a bit counter-intuitive since the claim of the paper is that partitioned state sync improves scheduling. The math on reducing max staleness in the cluster shows diminishing returns as the number of partitions grows to explain these results. 

3) Evaluations. The paper presents two strategies for scheduling — quality-first and latency-first. In quality-first, schedulers try to assign tasks to the highest quality slots first. As schedulers’ views become staler, they start to conflict more often and try to assign tasks to quality slots that have already been taken. Interestingly enough, the quality-first strategy, while producing on average higher-quality assignments, has more quality variance than the latency-first strategy that optimizes for the scheduling speed. The quality variance of adaptive policy that can switch between the quality and latency mode is even worse for some reason. 

“We simulate three scenarios that may happen in our production cluster on a typical day as follows. We create two groups of schedulers, A and B, and divide the timeline into three phases: (1) A and B are both operating at 2/3 of their full capacity; (2) A is operating at full capacity while B remains the same; (3) A and B are both operating at full capacity. Each phase is run for 30s”

We again feel that the evaluation in the paper does not do justice to quality assignments. Having quality as a function of a task may reduce contention since not all tasks will compete for the same slots anymore.

4) Relation to ML Systems. It was also mentioned in the discussion that ML systems have been using similar partitioned synchronization tricks to control the staleness of the models.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Characterizing and Optimizing Remote Persistent Memory with RDMA and NVM

We have looked at the “Characterizing and Optimizing Remote Persistent Memory with RDMA and NVM” ATC’21 paper. This paper investigates a combination of two promising technologies: Remote Direct Memory Access (RDMA) and Non-Volatile Memory (NVM). We have discussed both of these in our reading group before.

RDMA allows efficient access to the remote server’s memory, often entirely bypassing the remote server’s CPU. NVM is a new non-volatile storage technology. One of the key features of NVM is its ability to be used like DRAM, but with the added benefit of surviving power outages and reboots. Typically, NVM is also faster than traditional storage and gets closer to the latency of DRAM. However, NVM still significantly lags behind DRAM in throughput, and especially in write throughput. The cool thing about NVM is its “Memory Mode,” which essentially makes Optane NVM appear like a ton of RAM in the machine. And here is the nice part — if it acts like RAM, then we can use RDMA to access it remotely. Of course, this comes with a catch — after all, NVM is not actual DRAM, so what works well to optimize RDMA under normal circumstances may not work as well here. This paper presents a handful of RDMA+NVM optimizations from the literature and augments them with a few own observations.

Below I put the table with optimizations summarized in the paper. The table contains the literature optimizations (H1-H5), along with the author’s observations (Parts of H3, H6-H8). One additional aspect of the table is the applicability of optimization to one-sided or two-sided RDMA. One-sided RDMA does not involve a remote CPU. However, two-sided RDAM uses a remote server’s CPU.

H1: Accessing NVM attached to another socket is slow, so it is better to avoid it. We have discussed NUMA a bit in a previous paper to give a hint at what may be the problem here. The paper raises some concerns on the practicality of avoiding cross-socket access, but it also provides some guidance for implementing it. 

H2: For two-sided RDMA that involves the host’s CPU, it is better to spread the writes to multiple NVM DIMMs.

H3: Data Direct I/O technology (DDIO) transfers data from the network card to the CPU’s cache. In one-sided RDMA, that will cause the sequential writes to NVM to become Random, impacting performance, especially for large payloads, so it is better to turn DDIO off. However, there are some implications for touching this option, and the paper discusses them in greater detail. 

H4: ntstore command bypasses the CPU caches and stores data directly to the NVM. So this can make things a bit faster in two-sided RDMA that already touches the CPU. 

H5: This one deals with some hardware specs and suggests using writes that fill an entire 256 bytes XPLine (the data storage granularity). I am not going to explain this any further. However, the paper mentions that this optimization is not always great since padding the entire XPLine (and sending it over the network) incurs a lot of overhead.

H6: For one-sided RDMA, it is more efficient to write at the granularity of PCIe data word (64 bytes) for payloads smaller than XPLine. This is the granularity at which NIC operates over the PCIe bus. 

H7: Similar to H6, for two-sided RDMA, write at the granularity of a cache line (64 bytes on x86 architecture). 

H8: atomic operations, such as read-modify-write, are expensive, so for best performance, it is better to avoid them.

H9: Doorbell batching helps when checking for persistence. In one-sided RDMA, there is no easy way to check if data has persisted, aside from performing a read. This procedure incurs two round-trips and obviously hurts performance. Doorbell batching allows to send a write and a read at once but delays the read on the remote until the write completes, avoiding two separate round-trips. 

The figure below illustrates how the optimization changes the performance when added one at a time. It is worth noting that some optimizations can actually slow the system down, so it is important to understand the specific circumstances of the workload to make the proper decision on using some of the suggestions from this paper.

The paper has a lot more details and discussions of individual optimizations. It is also full of individual evaluations and experiments for different optimizations.

Brian Luger did an awesome presentation of the paper, which is available on YouTube:


1) Only writes. The paper focuses on a bunch of write optimizations, and no read ones. The authors explain this by saying that NVMs read throughput is very high and exceeds the NICs bandwidth, so the network will be a bottleneck for reading. On the write side of things, however, the bottleneck is the NVM itself, and it is not held up by other slower components, making write optimizations very desirable. At the same time, if the network catches up with NVM bandwidth, then we may need to play the same optimizing game for reads. 

One thing to mention here is that read-to-write ratios will play a huge role in perceived improvement from optimizations. The paper has not explored this angle, but if the workload is read-dominated (like many database workloads), then all the write optimizations will be left unnoticed for the overall performance. For all the fairness, the paper does use realistic workloads with writes and reads for their evaluation. 

2) Vendor-Specific? Currently, there is pretty much one vendor/product out there for NVM — Intel Optane. So a natural question is how many of these optimizations will transfer to other NVM products and vendors.

3) Vendor Benefit. Our final discussion point was about the exposure of RDMA+NVM technology in the cloud. It is still very hard/expensive to get VMs with RDMA support. Having the two technologies available in the public cloud is even less probably now. This means that RDMA+NVM will remain the technology for the “big boys” that run their own data centers. Vendors like Microsoft, AWS, Google, etc. can benefit from this tech in their internal products while making it virtually inaccessible to the general open-source competition.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. NrOS: Effective Replication and Sharing in an Operating System

The 77thth paper discussion in our reading group was “NrOS: Effective Replication and Sharing in an Operating System” from OSDI’21. While not a distributed systems paper, it borrows high-level distributed systems ideas (namely, state machine replication) to create a new NUMA-optimized sequential kernel.

See, all modern machines have many CPU cores. OS kernels must be able to utilize that multi-core hardware to achieve good performance. Doing so implies synchronization between threads/cores for access to shared kernel data structures. Typically, OSes go for a fine-grained synchronization to unlock as much concurrency as possible, making kernels very complex and prone to concurrency bugs. To complicate things further, servers often feature multiple CPUs in multiple different sockets. Each CPU has its own memory controller and can “talk” faster and more efficiently to its share of memory connected to that controller while accessing other CPU’s memory requires communication and extra latency. This phenomenon exists in certain single-socket systems as well. The nonuniformity in memory access (hence Non-Uniform Memory Acess or NUMA) creates additional problems and slowdowns when managing locks.

NrOS, with its NRkernel, differs from other kernels in the way it manages concurrency. Instead of a fine-grained approach, NrOS relies on more coarse-grained concurrency access, as it is a simple sequential kernel that protects the data structures with a simple coarse lock. Such coarse-grained locking can be rather bad for high-core systems with lots of contention. However, not all threads/cores compete for this lock for scalability reasons. To start, NRkernel replicates the shared data structures across NUMA nodes. So now cores on different CPUs/NUMA nodes can access a copy of the shared structure from their local memory. There is one little problem, though. We need to make sure all these copies are in sync so that no core/thread can read stale data structure. 

NRKernel solves this using a shared log. All of the copies share a single log structure from which they get their updates. When some thread needs to update a data structure, it tries to acquire a lock to the log to write an update. To reduce the contention of accessing the log, the NRKernel ensures that only one thread per NUMA node can acquire a lock. The kernel achieves that using a flat combining technique where multiple threads can try to access the shared structure, but only one can get the lock. This “lucky” thread becomes the combiner, and in addition to its own access, carries out the operations on behalf of other “not-so-lucky” threads in need of the shared object. Flat combining reduces the lock contention and acts as a batching mechanism where one thread applies operations of all other threads. 

The log helps order all updates to the shared data structures, but we still need to make sure reading the replicas is linearizable. The flat combining approach helps us here as well. If a combiner exists for a replica, then the read-thread looks at the current tail of the log and waits for the existing combiner to finish updating the replica up to that tail position. In a sense, such waiting ensures that the read operation is carried out with the freshest update at the time of the read initiation. If no combiner currently exists, the reading thread gets a lock and becomes the combiner, ensuring the local replica is up-to-date before performing the read. 

The paper goes on to explain some optimizations to this basic approach. For instance, in some cases, despite the batching and relatively few threads competing for exclusive access, the log becomes overwhelmed. In an Apache Kafka topic-esque way, the paper proposes to use multiple parallel logs for certain data structures to reduce the contention. NRkernel uses operation commutativity to place operations that can be reordered into the parallel logs and operations that have dependencies in the same log. This is similar to handling conflicts in numerous distributed systems, where we allow non-conflicting operations to run in parallel but ensure the partial order for dependant commands. 

NrOS uses this style of replication across NUMA nodes to support a variety of subsystems: a virtual memory, an in-memory file system, and a scheduler. The paper evaluates these components against Linux, where it shows better performance, and a handful of other research-level operating systems, like Barrelfish, where it shows comparable or better levels of performance.

This is a second botched presentation in a row where I had to scramble and improvise: 


I will leave this one without a discussion summary. We are not OS experts in this reading group (at least among the people in attendance), and improvised presentation did not facilitate an in-depth exploration of the paper. However, this seemed like a good paper for distributed systems folks to learn about bits of the OS kernel design and make connections. It was also nice to see some real systems (LevelDB) used in the benchmark, although these were tested against an in-memory filesystem, so it is far from a normal use case. A few natural questions arise in terms of the feasibility of the approach in more mainstream OSes. It seems that a huge “philosophical” gap between the NrOS’s solution and how mainstream kernels are designed means that NRkernel is likely to remain an academic exercise.

Reading Group Paper List. Papers ##81-90

We are continuing the DistSys reading group into the winter term with 10 exciting papers. This list is largely based on SOSP’21 papers. For our foundational paper, we will look at FLP impossibility.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Avocado: A Secure In-Memory Distributed Storage System

Our 76th reading group meeting covered “Avocado: A Secure In-Memory Distributed Storage System” ATC’21 paper. Unfortunately, the original presenter of the paper could not make it to the discussion, and I had to improvise the presentation on the fly:

So, the Avocado paper builds a distributed in-memory key-value database with a traditional complement of operations: Put, Get, Delete. There is also nothing special on the replication side – the system uses ABD, a very well-known algorithm. What makes this paper special is where in memory this database resides. 

Unlike the traditional systems, Avocado takes advantage of Trusted Execution Environments (TEE) or enclaves. In particular, the system relies on Intel’s version of the TEE. TEEs have some cool tricks up in their sleeves — the memory is so protected that even privileged software, such as an OS or a hypervisor, cannot access it. This protection means that both the data and the code inside the enclave are secured from malicious corruption. 

The paper suggests that these secure environments were not really designed for distributed systems. The problem lies in trust or, more precisely, ensuring that the replicas can trust each other to be correct. Normally, the attestation method would rely on a CPU-vendor service; in this case, it is the Intel attestation service (IAS). IAS would ensure the integrity of code and data in an enclave to the remote party. IAS, however, has some limitations, such as high latency, but it must be used every time a system creates a secure enclave. So Avocado builds its own trust service called configuration and attestation service (CAS). CAS uses IAS for attesting Avacado’s own local attestation service (LAS), and once it is attested, LAS can be used to attest all other enclaves. This kind of builds a longer chain of trust that uses a slow service very infrequently.

So what do we have so far? We can run code and store some information in secure enclaves. Now we need to ensure our code and our data reside in this secure memory. But things are not so easy here, as the enclaves are relatively small — 128 MB in size and have very slow paging capability if more memory is needed. So our goal is to fit the database in 128 MB of RAM. To make things more complicated, the network is still insecure. Avocado runs most of the code from within the enclave to ensure it is tamper-proof. This code includes CAS, replication protocol (ABD), and the networking stack implemented with kernel bypass, allowing Avocado to pretty much place the encrypted and signed messages straight into the network. 

The final piece is a storage component. Naturally, it is hard to store a lot of stuff in 128 MB. Instead of using secure memory for storage, Avocado uses insecure regular host memory for the actual payloads, allowing it to pack more data into one 128 MB enclave. With paging, the system can store even more data, although at some performance cost. The secure tamper-proof enclave only stores metadata about each value, such as a version, checksum, and the pointer to regular host memory with an encrypted value. Naturally, the encryption algorithm is running from within the enclave. Even if someone tries to mess with the value in host memory (i.e., try to replace it), it will be evident from the checksum in the enclave. 

Avocado is a complicated system driven by many limitations of TEEs. Moreover, TEEs significantly hinder the performance, as Avocado running in the secure enclaves is about half as fast as the version running in regular memory. 


1) Attestation service. I think the biggest challenge for our group was understanding the attestation model. The paper lacks some details on this, and the description is a bit confusing at times. For example, when discussing CAS, the authors bring up the LAS abbreviation without first introducing the term. I think it means Local Attestation Service, but I still cannot be sure. Anyway, our group’s confusion may also come from the lack of prior knowledge, and some googling can resolve it.

2) Fault model. The authors motivate and compare Avocado against Byzantine systems. I purposefully did not mention BFT in my summary, as I think the two solve very different problems. While BFT can tolerate several nodes having corrupt data and code, Avocado operates in a different environment. See, Avocado operates in an environment where we do not trust cloud vendors that host our applications. And then, if a cloud vendor can tamper with one node’s data or code, what can prevent them from changing/corrupting all the nodes? If all nodes are corrupt unknown to the user, then the BFT solutions become rather useless. Avocado security lies in the trust that enclaves are unhackable even to cloud vendors/hosts. With the enclave, we are not building a BFT system (i.e., a system that operates even when some nodes are “out of spec), and instead, we have a system that cannot go out of spec as long as we trust the enclaves. Needless to say, we also need to trust our own code. 

3) API model. The system operates on a simple key-value store, that can only put, get, and delete data. While key-value stores are useful as building blocks for many larger systems, these stores often have a more sophisticated API that allows ranged scans. Moreover, an ABD-based system cannot support transactional operations, such as compare-and-set, reducing the utility of this simple kv-store. I wonder, what would it take to make this thing run Multi-Paxos or Raft and add range operations? Is the single-node store a limitation? It uses a skip list, which should be sorted, so I’d suspect range queries should be possible. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Prescient Data Partitioning and Migration for Deterministic Database Systems

In the 75th reading group session, we discussed the transaction locality and dynamic data partitioning through the eyes of a recent OSDI’21 paper – “Don’t Look Back, Look into the Future: Prescient Data Partitioning and Migration for Deterministic Database Systems.” 

This interesting paper solves the transaction locality problem in distributed, sharded deterministic databases. The deterministic databases pre-order transactions so that they can execute without as much coordination after the pre-ordering step. Now, this does not mean that every transaction can run locally. Indeed, if a transaction needs to touch data across multiple partitions or shards, then there is a need to reach out across partitions, which introduces delays. A natural way to solve the problem is to migrate the data based on its access patterns to minimize the number of such cross-partition transactions.

Such dynamic data migration is pretty much the solution in Hermes, the system presented in the paper. The thing is that the “group the data together” approach is not new. The authors cite several papers, such as Clay that do very similar stuff. Now, the biggest difference between Hermes and other approaches is the decision-making process involved in figuring out how and where the data needs to go. Traditionally, dynamic re-partitioning solutions rely on the historical data from the workload. These approaches work great when the workload is decently stable. However, workloads that abruptly change their access patterns present a problem — the system based on historical observation of access locality is reactive and needs time to adjust to the new workload. So, naturally, to tolerate the rapidly changing workload locality characteristics, we need a proactive system that can predict these locality changes and make necessary changes ahead of time. It would be nice to have an oracle that can see into the future. Well, Hermes kind of does this. See, the system requires a batched database, and before executing each batch, it can look at the access patterns within the batch and adjust accordingly. 

Hermes looks at the patterns of transactions to be executed shortly, figures out how many transactions will require cross-partition coordination, and then makes data movements to minimize that number. For example, if some transaction type accesses two objects frequently together, Hermes will move one of these objects, incurring one cross-partition data transfer. If this happens, the transaction itself becomes partition-local, incurring no cross-partition data transfers.

In addition to moving and repartitioning the data around, Hermes also moves the transactions. The movement of compute tasks, again, is not new and makes a ton of sense. If a transaction originating in partition P1 needs some objects A & B located in some partition P2, then it makes a lot of sense to move a relatively small transaction to where the data is.

Unfortunately, the combination of dynamic re-partitioning and transaction movements leads to some unintended consequences over the long run. Consider a batch where objects A & B were used frequently together. The system moves them to one partition for speed. In the next batch, we may have objects B & C used together, so C moves to the same partition as B, again for speed. Now we have colocated objects A, B, and C. If this continues for very long, the system will consolidate more and more data in one place. This consolidation is not ideal for load balancing, so Hermes has to account for this and prevent data from gradually drifting closer together. It does so by “de-optimizing” for locality and allowing more distributed transactions in exchange for a more even load in each batch.

As far as performance, Hermes delivers when it comes to the workloads with frequent changes in locality:

As always, the presentation video from the reading group:


1) Wide Area Networks. Hermes takes advantage of locality in a data center setting. It groups objects used together to allow local transactions as opposed to distributed ones. However, it may not work in a geo-distributed environment. The problem is that Hermes only solves one type of locality puzzle. It accounts for the grouping of objects used together in hopes that these objects will be used together again — spatial locality and used soon enough — temporal locality. In fact, Hermes optimizes based on spatial criteria. This notion of locality works well in LAN, where the transaction can run in a partition with the data without a performance penalty. However, in WAN, moving transactions between partitions is costly. If a transaction originated in region R1, but data is in region R2, then moving the transaction from R1 to R2 may incur almost as much latency as moving data from R2 to R1. That is a big difference between LAN and WAN — in the WAN setting, transactions incur a significant latency penalty when processed in another region. In geo-distributed setting, locality means more than just grouping frequently used objects together to rip the benefit shortly. In addition to grouping data based on spatial principles, we need to preserve the geographical affinity and place data close to where it is accessed. In other words, in geo-replication, we do not only care about finding cliques of related data, but also placing these cliques in the best possible geography for a given workload. And needless to say, grouping objects and finding the best geographical location for the data often conflict with each other, making the problem significantly more nuanced and complicated.

2) Workload. A significant motivation for the paper is the existence of workloads with significant and abrupt access pattern changes. The paper refers to Google workload traces for the example of such abrupt workloads. The authors also conduct a significant portion of the evaluation on the workload created from these traces. We are a bit skeptical, at least on the surface, about the validity of this motivation. One reason for skepticism is the traces themselves — they come from Google Borg, which is a cluster management system. While Borg is obviously supported by storage systems, the traces themselves are very far from describing some actual database/transactional workload. It would be nice to see a bit more details on how the authors created the workload from the traces and whether there are other examples of workloads with abrupt access pattern changes. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Viewstamped Replication Revisited

Our 74th paper was a foundational one — we looked at Viestamped Replication protocol through the lens of the “Viewstamped Replication Revisited” paper. Joran Dirk Greef presented the protocol along with bits of his engineering experience using the protocol in practice.

Viestamped Replication (VR) solves the problem of state machine replication in a crash fault tolerance setting with up to f nodes failing in the cluster of 2f+1 machines. In fact, the protocol (between its two versions, the original and revisited) is super similar to Raft and Multi-Paxos. Of course, the original VR paper was published before both of these other solutions. An important aspect in the narrative of the VR paper is that the basic protocol makes no assumptions about persisting any data to disk. 


In the common case when replicating data, VR has a primary node that prescribes the operations and their order to the followers. The primary needs a majority quorum of followers to ack each operation to consider it committed. Once it reaches the quorum, the primary can tell the followers to commit as well. In VR, followers do not accept out-of-order commands and will not ack an operation without receiving its predecessor. With a few differences, this is how typical consensus-based replication works in both Raft in Multi-Paxos. For example, Multi-Paxos complicates things a bit by allowing out-of-order commits (but not execution to the state machine).

View Change and Replicas Recovery

As with everything in distributed systems, the interesting parts begin when things start to crumble and fail. Like other protocols, VR can mask follower crashes, so the real challenge is handling primary failures. The VR protocol orchestrates the primary failover with a view-change procedure. Here I will focus on the “revisited” version of the view-change. 

One interesting piece of the revisited view-change is its deterministic nature, as each view is assigned/computed to belong to a particular node. When a current primary in view fails, the node for view v+1 will try to become a new primary. This promotion to primary can be started by any node suspecting the current primary’s demise. We do not need to have a node for view v+1 to directly observe the crash of primary for v, allowing for unreliable failure detectors. 

So, any replica can start a view change if it detects a faulty primary. The replica will advance to the next view v+1 and start the view change by sending the StartViewChange message with the new view number to all the nodes. Each replica that receives the StartViewChange will compare the view to its own, and if its view is less than the one in the received StartViewChange message, it will also start the view change and send its own StartViewChange message to every replica. Also note, that two or more replicas may independently start sending the initial StartViewChange messages for the new view v+1 if they independently detect primary failure. The bottom line of this StartViewChange communication pattern is that now any replica that received at least f StartViewChange messages will know that the majority of the cluster is on board to advance with the view change. 

So, after receiving f StartViewChange messages for the new view v+1 from other nodes, each replica sends a DoViewChange message to the primary for that new view v+1. The DoViewChange message contains the replica’s last known view v’ along with the log. The new primary must receive a majority (i.e., f+1) of DoViewChange messages. The new primary then selects the most up-to-date log (i.e., the one with the highest v’ or longest log if multiple DoViewChange messages have the same highest v’). This up-to-date log ensures the new primary can recover any operations that might have been majority-accepted or committed in previous views. Such “learning from the followers and recover” part of the view change procedure is somewhat similar to Multi-Paxos, as the new leader needs to recover any missing commands before proceeding with the new ones. The VR leader recovery only cares about the view and the length of the log because it does not allow out-of-order prepares/commits. In Multi-Paxos, with its out-of-order commitment approach, we kind of need to recover individual log entries instead of the log itself. 

Anyway, after receiving enough (f+1DoViewChange messages and learning the most up-to-date log, the new primary can start the new view by sending the StartView message containing the new view v+1 and the recovered log. The followers will treat any uncommitted entries in the recovered log as “prepares” and ack the primary. After all entries have been recovered, the protocol can transition into the regular mode and start replicating new operations.

Schematic depiction of a View Change procedure initiated by one node observing primary failure. Other machines Send StartViewChange as a response. Eventually, a majority of nodes receive fStartViewChange messages and proceed to DoViewChange, at which point the new primary recovers the log.

There is a bit more stuff going on in the view change procedure. For example, VR ensures the idempotence of client requests. If some client request times out, the client can resend the operation to the cluster, and the new leader will either perform the operation if it has not been done before or return the outcome if the operation was successful at the old primary. 

Another important part of the paper’s recovery and view change discussion involves practical optimizations. For example, recovering a crashed in-memory replica requires learning an entire log, which can be slow. The proposed solution involves asynchronously persisting log and checkpoint to disk to optimize the recovery process. Similarly, view change requires log exchange, which is not practical. An easy solution would involve sending a small log suffix instead since it is likely that the new primary is mostly up-to-date. 


1) Comparison With Other Protocols. I think the comparison topic is where we spent most of the discussion time. It is easy to draw parallels between Multi-Paxos and VR. Similarly, Raft and VR share many things in common, including the similarity of the original VR’s leader election to that of Raft. 

One aspect of the discussion is whether Multi-Paxos, Raft, and VR are really the same algorithms with slightly different “default” implementations. We can make an argument that all these protocols are the same thing. They operate in two phases (the leader election and replication), require quorum intersection between the phases, and have the leader commit operation first before the followers. The differences are how they elect a leader to ensure that the committed or majority-accepted operations from the prior leader do not get lost. Multi-Paxos and VR revisited recover the leader, while Raft ensures the new leader is up to date to eliminate the need for recovery. 

2) Optimizations. Academia and Industry have churned out quite a substantial number of optimization for Multi-Paxos and Raft. There has even been a study on whether the optimizations are interchangeable across protocols (the short answer is yes). But that study did not include VR. Interestingly, Joran mentioned that they applied Protocol-Aware-Recovery to VR, and (if I am not mistaken) they also used flexible quorums. Of course, I am not surprised here, given how similar these solutions are. 

3) In-memory Protocol. Joran stressed the importance of in-memory protocol description to engineers. In their systems, the in-memory consensus allows reducing the reliance on fault-tolerance/reliability of the storage subsystem. It appears that VR is the only major SMR protocol specified without the need for a durable infallible disk. And while people in academia do a lot of in-memory consensuses (including Multi-Paxos and Raft), the aspects of what is needed to make these systems work without the disk are not clearly described. 

The VR paper briefly describes the problem that the lack of durable log introduces. The issue is that a node can forget its entire state. So, it can accept some value, create a majority and then reboot. Upon rebooting, if the node is allowed to participate right away, we no longer have the forgotten value in the majority quorum. Similarly, a node can vote on something in one view, reboot and forget and then potentially revote in a lesser view by some slow old primary. 

To avoid these problems, the node needs to recover before it can participate. At least a partial recovery is needed to avoid the double voting problem — the node needs to learn the current view before accepting new operations. At the same time, such a partially recovered node must still appear “crashed” for any older operations until it has fully recovered. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Polyjuice: High-Performance Transactions via Learned Concurrency Control

Our 73rd reading group meeting continued with discussions on transaction execution systems. This time we looked at the “Polyjuice: High-Performance Transactions via Learned Concurrency Control” OSDI’21 paper by Jiachen Wang, Ding Ding, Huan Wang, Conrad Christensen, Zhaoguo Wang, Haibo Chen, and Jinyang Li. 

This paper explores single-server transaction execution. In particular, it looks at concurrency control mechanisms and conjectures that the current approaches have significant limitations for different workloads. For instance, two-phase-locking (2PL) may work better in workloads that have high contention between transactions, while Optimistic Concurrency Control (OCC) works best in low-contention scenarios. A natural way to solve the problem is to create a hybrid solution that can switch between different concurrency control methods depending on the workload. A paper mentions a few such solutions but also states that they are too coarse-grained.

Polyjuice presents a different hybrid strategy that allows a more fine-grained, auto-tunable control over transaction concurrency control depending on the workload. The core idea is to extend the existing concurrency control mechanisms into a set of actions and train the system to take the best actions in response to each transaction type, transaction’s dependencies, and the current step within the transaction. 

Possible actions Polyjuice can take include different locking options, whether to allow dirty reads, and whether to expose dirty writes. 

For the sake of time, I will only talk about locking actions. Each transaction upon accessing some data must pick the concurrency control actions. For example, if some transaction “B” has another transaction “A” in its dependency list for some key, it needs to decide whether to wait or not for the dependent transaction “B.” In fact, “B” has a few options in Polyjuice: be entirely optimistic and not wait for “A” or to be more like 2PL and halt until the dependent “A” has finished. In addition to the two extremes, Polyjuice may tweak this lock to wait for partial execution of the dependent transaction. 

State & Action Space Policy Table. Example parameters for some transaction t1, step #2 are shown. Image from the authors’ presentation.

Polyjuice system represents the possible concurrency control actions through the state and action space policy table. In the table, each row is a policy for a particular step of a particular transaction. Each column is an action type. Wait actions are specific to dependents of a transaction, so if a transaction has a dependency, we will pick the wait action corresponding to the dependency. The cells then represent the action parameters, such as the wait duration or whether to read dirty. 

So, to operate the concurrency control actions, we need to maintain the dependency lists for transactions and tune the policy table for optimal parameters. The tuning is done with a reinforcement-learning-like approach, where we aim to optimize the policy (our table that maps state to actions) for maximizing the reward (i.e., the throughput) in a given environment (i.e., the workload). The actual optimization is done with an evolutionary algorithm, so it is a bit of a random process of trying different parameters and sticking with the ones that seem to improve the performance. 

It is important to note that the Polyjuice concurrency control policy is not ensuring the safety of transactions. It merely tries to tweak the waits and data visibility for best performance. So when each transaction finishes with all its accesses/steps, Polyjuice runs a final validation before committing. If the validation passes, then everything is ok, and if not, the transaction is aborted. This makes the entire concurrency control process optimistic, despite adding some waiting/locks for dependencies. In my mind, Polyjuice is a “loaded” optimistic concurrency control that tries to improve its chances of passing the validation and committing as quickly as possible. Kind of like a loaded die with a higher chance of getting the value you are betting on. 

Our presentation video is available below. Peter Travers volunteered for this paper about a day before the meeting, saving me from doing it. So big thanks to Peter, who did an awesome job presenting. 


1) Single-server transactions. Polujuice is a single-server system, which makes some of its transactional aspects a lot simpler. For example, keeping track of transaction dependencies in a distributed system would likely involve some additional coordination. As we have seen in Meerkat, additional coordination is not a good thing. At the same time, there may be some interesting directions for learned CC in distributed space. For example, we can train a separate model or policy table for different coordinators in the systems. This can be handy when deploying the system across WAN with non-uniform distances between nodes or coordinators. I was originally excited about this possibility, and it feels like a natural direction for research.

2) Fixed transaction types. The system expects a fixed set of transaction types — transactions that have the same “code” and execute the same logic, but on different data. It is not entirely clear what will happen when a new transaction type arrives at the system before Polyjuice is retrained to include this new type. We would expect some sort of a default fall-back mechanism to be in place.

3) Evolutionary algorithm. So this is an interesting part. Polyjuice uses an evolutionary optimization algorithm to train its policy model. These types of algorithms try to mimic natural evolutionary processes to arrive at the more optimal solution. For example, the systems may start with an initial population of policies, then it needs to evaluate how these initial population performs, pick the two best policies, and somehow combine them. This combined policy (i.e., the offspring of the two best ones) can then replace the weakest performing policy in the population. The process can repeat. 

The paper actually does not use a crossover approach to produce offspring between the best policies. Polyjuice creates “children” by mutating the parameters of the good parents to produce the next population generation. It then evaluates this next generation, prunes the weak policies, and repeats the process. 

The paper claims this works well, but we were wondering about the convergence of this to an optimal solution. Can the evolutionary algorithm get stuck in some local maximum and not find the best policy? Another concern is the time to converge, and the training impact on the transactional performance. To find the best policies, Polyjuice needs to evaluate the entire population. While this is happening, the performance of the system may stutter and jump back and forth as it tries different policies, which is not ideal in production workloads. At the same time, we must use production workloads to train the policy. And of course, the process requires multiple iterations. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle

Our 72nd paper was on avoiding coordination as much as possible. We looked at the “Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle” EuroSys’20 paper by Adriana Szekeres, Michael Whittaker, Jialin Li, Naveen Kr. Sharma, Arvind Krishnamurthy, Dan R. K. Ports, Irene Zhang. As the name suggests, this paper discusses coordination-free distributed transaction execution. In short, the idea is simple — if two transactions do not conflict, then we need to execute them without any kind of coordination. And the authors really mean it when they say “any kind.”  

In distributed transaction processing, we often think about avoiding excessive contention/coordination between distributed components. If two transactions are independent, we want to run them concurrently without any locks. Some systems, such as Calvin, require coordination between all transactions by relying on an ordering service to avoid expensive locks. Meerkat’s approach is way more optimistic and tries to avoid any coordination, including ordering coordination. Meerkat is based on Optimistic Concurrency Control (OCC) with timestamp ordering to enable independent transactions to run without locks and ordering services. Another innovation for avoiding coordination is the replication of transactions from clients straight to servers. Instead of relying on some centralized replication scheme, like Multi-Paxos or Raft, Meerkat lets clients directly write transactions to replicas, avoiding replication coordination and leader bottlenecks. The authors explored this “unordered replication” idea in their previous paper. The clients also act as the transaction coordinators in the common case. 

Meerkat does not stop its coordination avoidance efforts here at the cross-replica coordination. In fact, this is where the most interesting magic starts to happen — the authors designed a good chunk of a system specifically to avoid the cross-core coordination within each server to take advantage of modern multi-core CPUs. The authors call such avoidance of cross-core and cross-replica coordination a Zero Coordination Principle or ZCP for short.

As a motivating example for ZCP and the need for cross-core coordination avoidance, the paper illustrates the contention created by a simple counter shared between threads on one machine. It appears that with the help of modern technologies to alleviate networking bottlenecks (kernel bypass), such a shared counter becomes an issue at just 8 threads. In the example, a simple datastore with a shared counter could not scale past 16 threads, while a similar store without a shared resource had no such problems. 

Let’s talk about the protocol now to see how all the coordination-avoidance efforts actually work. The system tolerates \(f\) failures in the cluster of \(2f+1\) machines. Each transaction can read and write some set of objects supported by the underlying key-value store. These objects represent the transaction’s read- and write-sets. The replicas maintain two data structures to support transaction processing: a trecord and a vstore. 

The trecord is a table containing all transaction information partitioned by the CPU core ID to make each transaction “sticky” to a single core. It manages the transaction state, such as the read- and write-sets, transaction version timestamp, and commit status. The vstore stores versioned key-value pairs. Unlike the trecord, the vstore is shared among all cores at the server. The transaction protocol runs in 3 distinct phases: Execute, Verify, Write. I’m not sure these are the most intuitive names for the phases, but that will do.

In phase-1 (execute), the transaction coordinator (i.e., a client) contacts any replica and reads the keys in its read-set. The replica return versioned values for each key. The coordinator then buffers any pending writes.

The phase-2 (validation) combines the transaction commit protocol with replication of transaction outcome. The coordinator starts phase-2 by first selecting a sticky CPU core that will process the transaction. The sticky core ID ties each transaction to a particular CPU core to reduce inter-node coordination. The coordinator then creates a unique transaction id and a unique timestamp version to use for OCC checks. Finally, the coordinator sends all this transaction information to every replica in a validate message. 

Upon receiving the message, each replica creates an entry in its trecord. The entry maintains the transaction’s state and makes this transaction “stick” to the core associated with the core-partitioned trecord. At this point, the replica can validate the transaction using OCC. I will leave the details to the paper, but this is a somewhat standard OCC check. It ensures that both the data read in phase-1 is still current and the data in the write-set has not been replaced yet by a newer transaction. At the end of the check, the replica replies to the coordinator with its local state (OK or ABORT). 

The coordinator waits for a supermajority (\(f+\lceil\frac{f}{2}\rceil+1\)) fast-quorum of replies. If it receives enough matching replies, then the transaction can finish right away in the fast path. If the supermajority was in the “OK” state, the transaction commits, and otherwise, it aborts.

Sometimes a supermajority fast-quorum does not exist or does not have matching states, forcing the coordinator into a slow path. In a slow path, the coordinator only needs a majority of replicas to actually reply. If the majority has replied with an “OK” state, the coordinator can prescribe the replicas to accept the transaction, and otherwise, it prescribes the abort action. Once the replicas receive the prescribed transaction state, they mark the transaction accordingly in their trecord and reply to the coordinator. Here, the coordinator again waits for a quorum before finalizing the transaction to commit or abort. 

Finally, in phase-3 (write), replicas mark the transaction as committed or aborted. If the transaction is committed, then each replica can apply the writes against the versioned datastore. 

Phew, there is a lot to unpack here before diving deeper into the corner cases and things like replica and coordinator failures and recovery. The important parts relate to how the coordination is handled/avoided. To start, the coordinator uses a timestamp for the version, circumventing the need for a counter or centralized sequencer. The transaction replicates with its timestamp directly by the coordinator (who happens to be the client in the normal case) to the replicas, avoiding the need for a centralized replication leader or primary. At the server level, each transaction never changes its execution core even as it goes through different phases. All messages get routed to the core assigned to the transaction, and that core has unique access to the transaction’s trecord partition. This “core stickiness” avoids coordination between the cores of a server(!) for the same transaction. I speculate here a bit, but this may also be good for cache use, especially if designing individual transaction records to fit in a cache line. As a result, the only place the coordination happens between transactions is the OCC validation. During the validation, we must fetch current versions of objects from the core-shared vstore, creating the possibility for cross-core contention between transactions accessing the same data.

I do not want to go too deep into the failure recovery; however, there are a few important points to mention. The replica recovery process assumes replicas rejoin with no prior state, so they are in-memory replicas. The recovery process is leader-full, so we are coordinating a lot here. And finally, the recovery leader halts transaction processing in the cluster. As a result, the recovery of one replica blocks the entire system as it needs to reconcile one global state of the trecord that can be pushed to all replicas in a new epoch. I will leave the details of the recovery procedure to the paper. However, intuitively, this over-coordination in recovery is needed because the normal operation avoided the coordination in the first place. For example, state machine replication protocols “pre-coordinate” the order of all operations. When a replica needs to recover, it can learn the current term to avoid double voting before simply grabbing all committed items from the log available at other nodes and replaying them. It can replay the recovered log while also receiving new updates in the proper order. In Meerkat, we have no single history or log to recover, so a pause to reconcile a consistent state may be needed.  

The coordinator recovery is handled by keeping backup coordinators and using Paxos-like consensus protocol to ensure only one coordinator is active at the same time and that the active coordinator is in a proper state. 

Now we can talk a bit about the performance. Meerkat significantly outperforms TAPIR, which is the previous system from the same group. The performance gap between the systems is huge. This begs a natural question about the performance. Just how much of Meerkat’s gain is due to a super-optimized implementation and utilization of techniques like kernel-bypass? Meerkat-PB in the figure can shed some light on this, as it represents a version of Meerkat with a dedicated primary for clients to submit transactions. Having a primary adds cross-replica coordination, and despite that, it still significantly outperforms the older systems.

As always, we had a presentation in the group, and it is available in our YouTube channel. This time, Akash Mishra did the presentation:


Quite frankly, I have incorporated quite a bit of discussion into the summary already. 

1) Performance. One of the bigger questions was about performance. How much of the “raw” speed is due to the ZCP, and how much of it is due to the enormous implementation expertise and use of kernel bypass and fancy NIC to deliver messages to proper cores/threads. We speculate that a lot of the overall performance is due to these other improvements and not ZCP. That being said, once you have an implementation this efficient/fast, even tiny bits of coordination start to hurt substantially, as evidenced by the motivation examples and even the primary-backup version of Meerkat. 

2) Replica Recovery. The blocking nature of recovery may present a real problem in production systems, especially if the recovery time is substantial. It would have been nice to see the recovery evaluations.

3) Performance States. To continue with performance/recovery topics, it appears as the system can operate in multiple very distinct performance states. In the fast-quorum operation, commits come quickly. In the slow path, a whole new round-trip exchange is added (probably after the timeout). This creates distinct latency profiles for fast and slow paths. This can also create distinct throughput profiles, as a slow path sends and receives more messages, potentially creating more load in the system. 

4) Sharded systems? Many systems use sharding to isolate coordination into smaller buckets of nodes or replica-sets. For example, Spanner and Cockroach DB do that. Such sharding allows independent transactions to run in separate “coordination pods” without interfering with each other. To scale these systems just need to create more such coordination pods. Of course, in systems like Spanner, transactions that span multiple replica-sets add another level of coordination on top, but the chance that any two shards need to coordinate is kept low by making lots of tiny shards. I wonder about the differences between two philosophies — avoiding coordination vs. embracing it, but in small groups. Are there benefits to “coordination pods”? Should we embrace ZCP? Can ZCP survive the scale of these larger sharded systems?

5) The need for supermajority quorum? Supermajority fast quorums often raise many questions. Just exactly why do we need them? The short answer here is fault-tolerance, or more specifically the ability to recover operations after failures. See, in majority quorum protocols that have a leader, we have at most one operation that can be attempted in the given epoch and log position. This means that if some replicas fail, we can recover the operation if we can guarantee to see the value in at least one replica. Any two majority quorums intersect in at least one replica, making the single operation recoverable as long as it has made it to the majority. Unfortunately, this does not work with leaderless solutions as illustrated by Fast Paxos, as many different values can be attempted in the same epoch and slot position. However, we still need to survive the failures and recover. 

Let’s look at the example to illustrate this. Assume we have 5 nodes, 3 of which have accepted value “A” and 2 have value “B.” Let’s assume that we commit “A” at this time since we clearly have a majority agreeing on “A.” If 2 “A” nodes crash, we will have 3 live nodes remaining: “A, B, B.” By looking at these nodes we do not know the fact that value “A” might have been committed by the coordinator. We need a supermajority quorum to survive the failures and recover. If we commit with a supermajority of size \(f+\lceil\frac{f}{2}\rceil+1\) (4 out of 5 nodes) “A, A, A, A, B”, then failing 2 “A” nodes leaves us with 3 nodes: “A, A, B.” We see more “A” operations here and can recover “A.” In fact, if some value is in a supermajority, then any majority quorum will have the majority of its nodes (i.e. \(\lceil\frac{f}{2}\rceil+1\) nodes) having that value (but not the vice-versa — the majority value of a majority quorum does not mean the value was in the supermajority).

Now, how does this relate to Meerkat? Each Meerkat transaction has a unique id, so one may think we never have the possibility of committing two or more different transactions for the same id. However, we have to be careful about what Meerkat replicas need to agree upon. It is not the transaction itself, but a transaction status — OK or ABORT. So, we do have two possible values that can exist at different replicas for the same transaction. As a result, Meerkat needs a fast path supermajority quorum to make the transaction status decision recoverable in the replica recovery protocol.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!