# Reading Group. ByShard: Sharding in a Byzantine Environment

Our 93rd paper in the reading group was “ByShard: Sharding in a Byzantine Environment” by Jelle Hellings, Mohammad Sadoghi. This VLDB’21 paper talks about sharded byzantine systems and proposes an approach that can implement 18 different multi-shard transaction algorithms. More specifically, the paper discusses two-phase commit (2PC) and two-phase locking (2PL) in a byzantine environment.

As usual, we had a presentation of the paper. Karolis Petrauskas did an excellent job explaining this work:

The paper states that modern blockchain technology relies on full replication, making the systems slower and harder to scale.

Sharding is a natural way to solve the problem and has been done countless times in the crash fault tolerance setting. Of course, a sharded system often needs to perform transactions that touch data in more than one shard. The usual way to solve this in CFT is to use some version of 2PC coupled with some concurrency control mechanism, like 2PL. ByShard follows this general recipe, but in the BFT setting, which complicates things a bit. The problem is making 2PC and 2PL work in a byzantine, adversarial environment without tightly coupling all shards back together into one “megashard.” So, we need a way to communicate between shards in a reliable way.

Let’s look at a transaction lifecycle. When we have a transaction that spans multiple shards, the first step is to deliver this transaction to each shard and check whether a shard can run it. Then, if everything is ok, we need to run the transaction and provide some isolation from other ongoing transactions. ByShard implements all these actions with shard-steps. Each shard-step is a building block of all ByShard protocols and allows the shard to inspect a transaction, make changes to the local state, and send the message to start another shard-step on another shard. Overall, ByShard uses three distinct types of shard-steps: vote-step, commit-step, and abort-step.

The message sending part is kind of important, as we need this communication to be reliable in the BFT setting. The paper gracefully ignores this problem and points to a few solutions in the literature. In short, ByShard requires a cluster-sending protocol that ensures reliable communication between shards, such that, all correct nodes of the receiver shard get the message, all the correct nodes of the sender shard get an acknowledgment, and that sending requires the sender shard to reach an agreement on what to send. The last point ensures that bad actors do not send malicious stuff, and I suspect on the receiver there needs to be a way to check that the received messages were indeed certified by the sender’s consensus.

Vote-step is used to replicate the transaction between shards. When a shard receives the transaction, it starts the vote-step and checks whether it can proceed with the transaction. The shard may also perform local state changes if needed. At the end of the vote-step, a shard forwards some information to another shard to start a new shard-step. Since we only have three building blocks, the stuff vote-step sends can start another vote-step, commit-step, or abort-step at the receiving end. The purpose of commit-step and abort-step is self-evident from their name. One important thing to note on abort-step is that it needs to undo any local changes that a prior vote-step might have done to ensure that the aborted transaction leaves no side effects.

Now we can look at how ByShard composes these three basics steps. The figure above provides a visual illustration of three different ways ByShard runs 2PC. One aspect of the figure that I fail to understand is why not all shards run vote-step and commit-step, and the text does not really provide an explanation.

In the linear orchestration, the transaction progresses from the coordinator one shard at a time. If any shard decides to abort, it needs to start the abort-step and notify all other shards involved in the transaction (or at least all other shards that voted earlier). If a shard decides to commit, it actually starts a vote-step in the next shard. If the vote-step successfully reaches and passes the last shard involved in the transaction, then that last shard can broadcast the commit-step to everyone. Centralized orchestration looks more like the traditional 2PC, and distributed orchestration cuts down on the number of sequential steps even further. The three strategies represent tradeoffs between the latency and number of communication exchanges and shard-steps.

So with 2PC taken care of, we can briefly discuss the concurrency control. ByShard proposes a few different ways to implement it, starting with no concurrency control, thus allowing observation of partial results. Because of the side effect cleaning ability of abort-step, if some transaction partly executes and then reaches the abort-step, then its execution will be undone or rolled back. This reminds me of the sagas pattern. The other solution is to use locks to control isolation. The paper (or the presentation above) has more details on the nuances of locking and requiring different locks with a different type of orchestration. By combining different ways to orchestrate the transactions with different ways to execute them, ByShard presents 18 BFT transactional protocols with different isolation and consistency properties.

## Discussion

1) Comparison with Basil. An obvious discussion is a comparison with Basil, another transactional sharded BFT system. Basil is a lot closer to the Meerkat solution from the CFT world, while ByShard is a more classical 2PC+2PL approach. In Basil, the degree of fault of tolerance is smaller (i.e, it needs 5f+1 clusters). At the same time, ByShard is a lot underspecified compared to Basil. ByShard relies on existing BFT consensus and BFT cluster-broadcast mechanisms to work, while Basil provides a complete and contained solution. On the performance side of things, ByShard requires a lot of steps and a lot of consensus operations across all the involved shards to do all of the shard-steps. This must have a significant performance toll. While it is not correct to compare numbers straight between papers, Basil can run thousands of transactions per second, while ByShard’s throughput is in single digits. However, it is worth mentioning that ByShard’s experiments included more shards; ByShard’s transactions also involved large number of shards.

2) (Distributed) Sagas Pattern. As I briefly mentioned in the summary above, ByShard, especially with linear orchestration and no isolation reminds me of Sagas patterns. Distributed sagas are used to orchestrate long-running requests in microservice-type applications. If we squint our eyes, we can see each shard as a separate microservice. As vote-steps propagate across shards, they perform local changes. And if an abort is needed, the abort causes these changes to be rolled back. However, when we add isolation to ByShard, the similarities with sagas start to disappear.

3) Performance. An important motivation for sharding is performance, however, it does not seem like ByShard achieves stellar performance here. Of course, sharding is still useful for systems that operate with large amounts of data that otherwise would not fit into a single machine. Nevertheless, without strong performance, a solution like this has very few advantages over not using sharded/partitioned systems at all.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Basil: Breaking up BFT with ACID (transactions)

Our 89th paper in the reading group was “Basil: Breaking up BFT with ACID (transactions)” from SOSP’21 by Florian Suri-Payer, Matthew Burke, Zheng Wang, Yunhao Zhang, Lorenzo Alvisi, and Natacha Crooks. I will make this summary short. We had a quick and improvised presentation as well. Unfortunately, this time around, it was not recorded.

The system presented in the paper, called Basil, proposes a sharded BFT transactional system. Basil is leaderless too, and overall reminds me of the Tapir/Meerkat line of work turned into the BFT setting. In these systems, clients do a bulk of the work, which definitely complicates things for BFT protocols — now we no longer have “dumb” clients who can be byzantine and instead rely on smart clients who can screw stuff up quite substantially.

The paper lists a few common assumptions for their system, such as the inability of Byzantine clients to cause a denial of service attack and having some degree of fault tolerance f in each shard with a shard of 5f+1 nodes. Also important are the definitions of byzantine isolation levels. In short, the paper aims to provide serializability, but only for correct clients. So byzantine participants cannot cause the reorder of operations observed by the well-behaved clients.

As I mentioned above, the actual protocol is very similar to Tapir/Meerkat. The clients run interactive transactions by reading from the involved partitions and buffering the writes locally. Once ready to commit, the clients run a two-phase commit protocol. Just like the CFT counterparts, the transactions are timestamp-ordered by clients, so clients must have synchronized time. Since the clients pick the timestamps, a faulty client can choose one far in the future, causing other dependent transactions to abort/hang. To avoid this, Basil replicas have a window of time in which they accept transactions. Replicas reject any transaction started by the client with a timestamp that is too much off from the replica’s time.

Another BFT issue that arises with running a two-phase commit is preparing a transaction with too many keys and not committing it. Similar to the timestamp problem, this can block any other dependent transactions. To mitigate the issue, the Basil protocol allows other clients to take over the stalled transactions. Such a takeover also solves client crash failure.

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit.

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT.

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine.

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit.

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT.

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine.

The version of the two-phase commit is where most of BFT magic is hiding, as this is not your vanilla 2PC. The prepare-phase consists of 2 stages: ST1 and ST2. In ST1, the protocol collects the votes to commit or abort from each shard, and in ST2, it makes such vote durable. The latter is needed if a new client coordinator needs to take over and reaches a different vote conclusion due to byzantine actors.

Again in the spirit of Tapir/Meerkat, the ST2 is optional if ST1 has completed with a fast unanimous quorum of 5f+1 nodes. The paper contains many interesting details about the stages of prepare phase. One curious part is that ST2 logs the voting results from all shards in just one shard. The aborts also have a fast and slow path, and the quorums are smaller for abort decisions than for commit.

The recovery protocol that allows other clients to take over can succeed by simply learning the decision from ST2 through the quorum of replicas in a shard that stored the vote. It is unclear what happens in the fast-path prepare that does not run ST2. However, if the votes for ST2 are divergent (which may happen due to a faulty behavior or multiple concurrent repairs), Basil falls back to a leader-based view-change protocol. And, of course, it is a bit more complicated to make it BFT.

On the evaluation side, Basil outperforms transactional BFT competitors but remains slower than the CFT counterparts. I also want to point out a low level of fault tolerance — out of six nodes in the cluster, only one node can be byzantine.

## Discussion

1) Low fault-tolerance. Requiring six nodes to tolerate one failure (5f+1 configuration) is a rather fault-tolerance threshold. To double the fault tolerance, we need 11 replicas! For comparison, the HotStuff protocol, which authors use as a baseline, needs a cluster of size 3f+1. Requiring more nodes also raise some questions about efficiency — while the performance is good, the protocol also needs more resource to achieve it.

2) More fault-tolerance? In a few places in the paper, it is mentioned that up to votes can be missed due to asynchrony, and another f due to byzantine behavior: “A unanimous vote ensures that, since correct replicas never change their vote, any client C′ that were to step in for C would be guaranteed to observe at least a Commit Quorum of 3f + 1 Commit votes; C′ may miss at most f votes because of asynchrony, and at most f more may come from equivocating Byzantine replicas.” This suggests that the practical fault tolerance may be better than just f.

3) Unanimous fast quorum. The unanimous fast quorum is another potential problem for performance when things are not going well. The sole faulty client will throw the protocol off the fast-path prepares, requiring more resources to prepare each transaction. Not to mention, waiting for a timeout on a faulty replica does not improve the latency.

4) Questions about recovery. We had some questions about the recovery procedure. It seems like the first step is to try to recover by reading the recorded prepare vote, and if everything is good, simply finish the commit for the transaction. However, it appears that durably recording votes in one place is an optional stage: “If some shards are in the slow set, however, C needs to take an additional step to make its tentative 2PC decision durable in a second phase (ST2).” As a result, under normal conditions, there may not be votes from ST2 to recover from one shard/partition. Does the recovering client then need to contact all partitions of a transaction?

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group Special Session: Fast General Purpose Transactions in Apache Cassandra

Modern distributed databases employ leader-based consensus protocols to achieve consistency, entailing certain trade-offs: typically either a scalability bottleneck or weak isolation. Leaderless protocols have been proposed to address these and other shortcomings of leader-based techniques, but these have not yet materialized into production systems.

This paper outlines compromises entailed by existing leaderless protocols versus leader-based protocols and proposes general techniques for addressing them. A new protocol, called Accord, is proposed with optimal failure tolerance that, under reasonable assumptions, achieves optimal latency of two message delays for transactions initiated by coordinators in any region, under any level of competing transactions and maximal tolerated process failures.

Benedict Elliott Smith will present the Accord protocol in our DistSys Reading Group. Benedict is an Apache Cassandra contributor with an interest in performance, correctness, and algorithm design.

When: February 9th 2022 at 2 PM EST (Check your time zone)

# Reading Group. UniStore: A fault-tolerant marriage of causal and strong consistency

For the 80th paper in the reading group, we picked “UniStore: A fault-tolerant marriage of causal and strong consistency” by Manuel Bravo, Alexey Gotsman, Borja de Régil, and Hengfeng Wei. This ATC’21 paper adapts the Partial Order-Restrictions consistency (PoR) into a transactional model. UniStore uses PoR to reduce coordination efforts and execute as many transactions as possible under the causal consistency model while resorting to strong consistency in cases that require ordering concurrent conflicting transactions. The PoR consistency itself is an extension of RedBlue consistency that allows mixing eventually consistent and strongly consistent operations.

UniStore operates in a geo-replicated model, where each region/data center (I use region and data center interchangeably in this post) stores the entirety of the database. The regions are complete replicas of each other. Naturally, requiring strong consistency is expensive due to cross-region synchronization. Instead, UniStore allows the developers to choose running transactions as causally or strongly consistent. Causal consistency preserves the cause-and-effect notion between events — if some event e1 has resulted in event e2, then an observer seeing e2 must see the e1 as well. Naturally, if two events are concurrent, then they are not causally dependent. This independence gives us the freedom to apply these events (i.e., execute against the store) in any order. For instance, this approach provides good performance for events that may touch disjoint data concurrently. However, if these two events are concurrent and operate on the same data, then the events either need to be commutative or be partly ordered. UniStore does both of these — it implements CRDTs to ensure commutativity, but it also allows to declare a transaction as strongly consistent for cases that require ordering. As such, strong consistency becomes handy when the commutativity alone is not sufficient for the safety of the application. For example, when a transaction does a compare-and-set operation, the system must ensure that all replicas execute these compare-and-set operations in the same order.

So, in short, causal consistency is great when an application can execute complex logic in causal steps — event e1 completes, then observing the results of e1 can cause some other change e2, and so on. Strong consistency comes in handy when step-by-step non-atomic logic is not an option, and there is a need to ensure the execution order of conflicting concurrent operations. In both cases, the systems need to keep track of dependencies. For causal transactions, the dependencies are other transactions that have already finished and were made visible. For strongly consistent transactions, the dependencies also include other concurrent, conflicting, strongly consistent transactions.

So, how does UniStore work? I actually do not want to get into the details too deep. It is a complicated paper (maybe unnecessarily complicated!), and I am not that smart to understand all of it. But I will try to get the gist of it.

The system more-or-less runs a two-phase commit protocol with optimistic concurrency control. Causal transactions commit in the local data center before returning to the client. However, these causal transactions are not visible to other transactions (and hence other clients/users) just yet. Remember, this causal transaction has not been replicated to other data centers yet, and a single region failure can cause some problems. In fact, if a strongly consistent transaction somehow takes such a non-fully replicated causal transaction as a dependency, then the whole system can get stuck if the dependent causal transaction gets lost due to some minority regions failing.

UniStore avoids these issues by making sure the causal transactions are replicated to enough regions before these causal transactions are made visible. This replication happens asynchronously in the background, sparing the cost of synchronization for non-strongly consistent transactions (that is, of course, if clients/users are ok with a remote possibility of losing transactions they thought were committed).

Strongly consistent transactions are a different beast. They still optimistically run in their local data centers but no longer commit in one region to ensure the ordering between other strongly consistent transactions. UniStore uses a two-phase commit here as well, but this time the commitment protocol goes across all healthy regions. First, the coordinator waits for enough data centers to be sufficiently up-to-date. This waiting is crucial for liveness; it ensures that no dependent transaction may get forgotten in the case of data center outages. After the waiting, the actual two-phase commit begins, with all (alive?) regions certifying the transaction.

To implement this waiting and only expose the durable and geo-replicated state to transactions, UniStore has a complicated system of version tracking using a bunch of vector clocks and version vectors. Each of these vectors has a time component for each data center and an additional “strong” counter for keeping track of strongly consistent transactions. Each transaction has a couple of important version vectors.

The snapshot vector snapVec describes the consistent snapshot against which the transaction runs. The commit vector commitVec tells the commit version of a transaction used for ordering.

Each replica keeps two different version vectors representing the version of the most recent transaction known to itself and its data center. Since the system relies on FIFO order of communication and message handling, knowing the version of the most recent transaction implies the knowledge of all lower-versioned transactions as well. This information is then exchanged between data centers to compute yet another version vector to represent the latest transaction replicated to at least some majority of data centers. This, again, implies that all lower-versioned transactions have been replicated as well. This last version vector allows strongly consistent transactions to wait for their dependencies to become globally durable to ensure liveness.

So here is where I lose understanding of the paper, so read on with a pinch of salt, as my skepticism may be completely unwarranted. It makes sense to me to use version vectors to keep track of progress and order causal transactions. Each region computes the region’s known progress, exchanges it with other regions, and calculates the global “transaction frontier” — all transactions that have been replicated to a sufficient number of data centers. This exchange of known progress between regions happens asynchronously. I am not entirely sure how these progress vectors help with ordering the conflicting transactions. Somehow the “strong” counter should help, but this counter seems to be based on the regions’ knowledge of progress and not the global one. I suspect that these vectors help identify the concurrent conflicting transactions. The progress known in the data center ends up in a snapVec and represents the snapshot on which the transaction operates. The strongly consistent transactions use a certification procedure (i.e., a two-phase commit) to decide whether to abort or commit. The paper mentions that the certification process assigns the commitVec, which actually prescribes the order. At this point, I hope that conflicting transactions are caught in this Paxos-based transaction certification procedure and ordered there or at least aborted as needed. Also worth mentioning that the extended technical report may have more details that I was lazy to follow through.

Now a few words about the evaluation. The authors focus on comparing UniStore against both casual and strongly consistent data stores. Naturally, it sits somewhere in the middle of these two extremes. My bigger concern with their implementation is how well it scales with the number of partitions and number of data centers. The paper provides both of these evaluations, but not nearly to the convincing scale. They go up to 5 data centers and up to 64 partitions. See, with all the vectors and tables of vectors whose size depends on the number of regions, UniStore may have some issues growing to “cloud-scale,” so it would be nice to see how it does at 10 data centers or even 20. Cloud vendors have many regions and even more data centers; Azure, for example, has 60+ regions with multiple availability zones.

Our groups persentation by Rohan Puri is on YouTube:

## Discussion

1) Novelty. So the paper works with a rather interesting consistency model that combines weaker consistency with strong on a declarative per-operation basis. This model, of course, is not new, and the paper describes and even compares against some predecessors. So we had the question about the novelty of UniStore since it is not the first one to do this kind of consistency mix-and-match. It appears that the transactional nature of UniStore is what separates it from other such solutions. In fact, the bulk of the paper talks about the transactions and ensuring liveness in the face of data center outages, so this is nice. Many real-world databases are transactional, and having a system like this is a step closer to a practical solution.

2) Liveness in the presence of data center failures. Quite a lot of the paper’s motivation goes around the inability to simply run OCC + 2PC in the PoR consistency model and maintain liveness. One problem occurs when a causal transaction takes a dependency on another transaction that did not make it to the majority of regions. If such a transaction is lost, it may stall the system. Of course, any region that takes a dependency on some transaction must see it first. Anyway, it is hard to see the novelty in “transaction forwarding” when pretty much any system recovers the partly replicated data by “forwarding” it from the nodes that have it.

However, the bigger motivational issue is with strongly consistent transactions. See, the authors say that the system may lose liveness when a strongly consistent transaction commits with a dependency on a causal one that has not been sufficiently replicated, and that causal transaction gets lost in a void due to a region failure. However, to me, this seems paradoxical — how can all (healthy) regions accept a transaction without having all the dependencies first? It seems like a proper implementation of the commit protocol will abort when some parties cannot process the transaction due to the lack of dependencies. Anyway, this whole liveness thing is not real and appears to be just a way to make the problem look more serious.

That said, I do think there is a major problem to be solved. Doing this more proper commit protocol may hurt the performance by either having a higher abort rate or replicating dependencies along with the strongly consistent transaction. We’d like to see how much better UniStore is compared to the simpler 2PC-based solution that actually aborts transactions when it cannot run them.

3) Evaluation. The evaluation largely compares against itself, but in different modes — causal and strong. This is ok, but how about some other competition? Take a handful of other transactional approaches and see what happens? The current evaluation tells us that the PoR model provides better performance than strong and worse than causal consistency. But this was known before. Now we also know that this same behavior translates to a transactional world. But we do not know how the cost of the protocol fares against other transactional systems that do not have PoR and are not based on UniStore with features disabled.

Also interesting to see is how expressive the transactional PoR consistency model is. For example, let’s take MongoDB. It can be strongly consistent within a partition and causal across the partitions (and within the partition, users can manipulate the read and write consistency on a per-operation basis). What kind of applications can we have with Mongo’s simpler model? And what kind of apps need UniStore’s model with on-demand strong consistency across the partitions?

4) Complicated solution??? I have already mentioned this in the summary, but UniStore is complicated and relies on many moving parts. The paper completely omits the within-datacenter replication for “simplicity,” but it does not really make the paper simple. We have vectors that track progress, order, and snapshots, and then we have tables of vectors, and all these multiple kinds of vectors are exchanged back-and-forth to compute more vectors only to find out when it is ok to make some transactions visible or unblock some strongly consistent transactions. How come other systems (MongoDB again?) implement causal consistency with just one number for versioning (HLC) and still allow to specify stronger guarantees when needed? Yeah, they may not implement the PoR consistency model but it just seems too complicated. As a side question… what happens in Mongo when we start changing consistency between causal and strong on a per-request basis?

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Prescient Data Partitioning and Migration for Deterministic Database Systems

In the 75th reading group session, we discussed the transaction locality and dynamic data partitioning through the eyes of a recent OSDI’21 paper – “Don’t Look Back, Look into the Future: Prescient Data Partitioning and Migration for Deterministic Database Systems.”

This interesting paper solves the transaction locality problem in distributed, sharded deterministic databases. The deterministic databases pre-order transactions so that they can execute without as much coordination after the pre-ordering step. Now, this does not mean that every transaction can run locally. Indeed, if a transaction needs to touch data across multiple partitions or shards, then there is a need to reach out across partitions, which introduces delays. A natural way to solve the problem is to migrate the data based on its access patterns to minimize the number of such cross-partition transactions.

Such dynamic data migration is pretty much the solution in Hermes, the system presented in the paper. The thing is that the “group the data together” approach is not new. The authors cite several papers, such as Clay that do very similar stuff. Now, the biggest difference between Hermes and other approaches is the decision-making process involved in figuring out how and where the data needs to go. Traditionally, dynamic re-partitioning solutions rely on the historical data from the workload. These approaches work great when the workload is decently stable. However, workloads that abruptly change their access patterns present a problem — the system based on historical observation of access locality is reactive and needs time to adjust to the new workload. So, naturally, to tolerate the rapidly changing workload locality characteristics, we need a proactive system that can predict these locality changes and make necessary changes ahead of time. It would be nice to have an oracle that can see into the future. Well, Hermes kind of does this. See, the system requires a batched database, and before executing each batch, it can look at the access patterns within the batch and adjust accordingly.

Hermes looks at the patterns of transactions to be executed shortly, figures out how many transactions will require cross-partition coordination, and then makes data movements to minimize that number. For example, if some transaction type accesses two objects frequently together, Hermes will move one of these objects, incurring one cross-partition data transfer. If this happens, the transaction itself becomes partition-local, incurring no cross-partition data transfers.

In addition to moving and repartitioning the data around, Hermes also moves the transactions. The movement of compute tasks, again, is not new and makes a ton of sense. If a transaction originating in partition P1 needs some objects A & B located in some partition P2, then it makes a lot of sense to move a relatively small transaction to where the data is.

Unfortunately, the combination of dynamic re-partitioning and transaction movements leads to some unintended consequences over the long run. Consider a batch where objects A & B were used frequently together. The system moves them to one partition for speed. In the next batch, we may have objects B & C used together, so C moves to the same partition as B, again for speed. Now we have colocated objects A, B, and C. If this continues for very long, the system will consolidate more and more data in one place. This consolidation is not ideal for load balancing, so Hermes has to account for this and prevent data from gradually drifting closer together. It does so by “de-optimizing” for locality and allowing more distributed transactions in exchange for a more even load in each batch.

As far as performance, Hermes delivers when it comes to the workloads with frequent changes in locality:

As always, the presentation video from the reading group:

## Discussion

1) Wide Area Networks. Hermes takes advantage of locality in a data center setting. It groups objects used together to allow local transactions as opposed to distributed ones. However, it may not work in a geo-distributed environment. The problem is that Hermes only solves one type of locality puzzle. It accounts for the grouping of objects used together in hopes that these objects will be used together again — spatial locality and used soon enough — temporal locality. In fact, Hermes optimizes based on spatial criteria. This notion of locality works well in LAN, where the transaction can run in a partition with the data without a performance penalty. However, in WAN, moving transactions between partitions is costly. If a transaction originated in region R1, but data is in region R2, then moving the transaction from R1 to R2 may incur almost as much latency as moving data from R2 to R1. That is a big difference between LAN and WAN — in the WAN setting, transactions incur a significant latency penalty when processed in another region. In geo-distributed setting, locality means more than just grouping frequently used objects together to rip the benefit shortly. In addition to grouping data based on spatial principles, we need to preserve the geographical affinity and place data close to where it is accessed. In other words, in geo-replication, we do not only care about finding cliques of related data, but also placing these cliques in the best possible geography for a given workload. And needless to say, grouping objects and finding the best geographical location for the data often conflict with each other, making the problem significantly more nuanced and complicated.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Polyjuice: High-Performance Transactions via Learned Concurrency Control

Our 73rd reading group meeting continued with discussions on transaction execution systems. This time we looked at the “Polyjuice: High-Performance Transactions via Learned Concurrency Control” OSDI’21 paper by Jiachen Wang, Ding Ding, Huan Wang, Conrad Christensen, Zhaoguo Wang, Haibo Chen, and Jinyang Li.

This paper explores single-server transaction execution. In particular, it looks at concurrency control mechanisms and conjectures that the current approaches have significant limitations for different workloads. For instance, two-phase-locking (2PL) may work better in workloads that have high contention between transactions, while Optimistic Concurrency Control (OCC) works best in low-contention scenarios. A natural way to solve the problem is to create a hybrid solution that can switch between different concurrency control methods depending on the workload. A paper mentions a few such solutions but also states that they are too coarse-grained.

Polyjuice presents a different hybrid strategy that allows a more fine-grained, auto-tunable control over transaction concurrency control depending on the workload. The core idea is to extend the existing concurrency control mechanisms into a set of actions and train the system to take the best actions in response to each transaction type, transaction’s dependencies, and the current step within the transaction.

Possible actions Polyjuice can take include different locking options, whether to allow dirty reads, and whether to expose dirty writes.

For the sake of time, I will only talk about locking actions. Each transaction upon accessing some data must pick the concurrency control actions. For example, if some transaction “B” has another transaction “A” in its dependency list for some key, it needs to decide whether to wait or not for the dependent transaction “B.” In fact, “B” has a few options in Polyjuice: be entirely optimistic and not wait for “A” or to be more like 2PL and halt until the dependent “A” has finished. In addition to the two extremes, Polyjuice may tweak this lock to wait for partial execution of the dependent transaction.

Polyjuice system represents the possible concurrency control actions through the state and action space policy table. In the table, each row is a policy for a particular step of a particular transaction. Each column is an action type. Wait actions are specific to dependents of a transaction, so if a transaction has a dependency, we will pick the wait action corresponding to the dependency. The cells then represent the action parameters, such as the wait duration or whether to read dirty.

So, to operate the concurrency control actions, we need to maintain the dependency lists for transactions and tune the policy table for optimal parameters. The tuning is done with a reinforcement-learning-like approach, where we aim to optimize the policy (our table that maps state to actions) for maximizing the reward (i.e., the throughput) in a given environment (i.e., the workload). The actual optimization is done with an evolutionary algorithm, so it is a bit of a random process of trying different parameters and sticking with the ones that seem to improve the performance.

It is important to note that the Polyjuice concurrency control policy is not ensuring the safety of transactions. It merely tries to tweak the waits and data visibility for best performance. So when each transaction finishes with all its accesses/steps, Polyjuice runs a final validation before committing. If the validation passes, then everything is ok, and if not, the transaction is aborted. This makes the entire concurrency control process optimistic, despite adding some waiting/locks for dependencies. In my mind, Polyjuice is a “loaded” optimistic concurrency control that tries to improve its chances of passing the validation and committing as quickly as possible. Kind of like a loaded die with a higher chance of getting the value you are betting on.

Our presentation video is available below. Peter Travers volunteered for this paper about a day before the meeting, saving me from doing it. So big thanks to Peter, who did an awesome job presenting.

## Discussion

1) Single-server transactions. Polujuice is a single-server system, which makes some of its transactional aspects a lot simpler. For example, keeping track of transaction dependencies in a distributed system would likely involve some additional coordination. As we have seen in Meerkat, additional coordination is not a good thing. At the same time, there may be some interesting directions for learned CC in distributed space. For example, we can train a separate model or policy table for different coordinators in the systems. This can be handy when deploying the system across WAN with non-uniform distances between nodes or coordinators. I was originally excited about this possibility, and it feels like a natural direction for research.

2) Fixed transaction types. The system expects a fixed set of transaction types — transactions that have the same “code” and execute the same logic, but on different data. It is not entirely clear what will happen when a new transaction type arrives at the system before Polyjuice is retrained to include this new type. We would expect some sort of a default fall-back mechanism to be in place.

3) Evolutionary algorithm. So this is an interesting part. Polyjuice uses an evolutionary optimization algorithm to train its policy model. These types of algorithms try to mimic natural evolutionary processes to arrive at the more optimal solution. For example, the systems may start with an initial population of policies, then it needs to evaluate how these initial population performs, pick the two best policies, and somehow combine them. This combined policy (i.e., the offspring of the two best ones) can then replace the weakest performing policy in the population. The process can repeat.

The paper actually does not use a crossover approach to produce offspring between the best policies. Polyjuice creates “children” by mutating the parameters of the good parents to produce the next population generation. It then evaluates this next generation, prunes the weak policies, and repeats the process.

The paper claims this works well, but we were wondering about the convergence of this to an optimal solution. Can the evolutionary algorithm get stuck in some local maximum and not find the best policy? Another concern is the time to converge, and the training impact on the transactional performance. To find the best policies, Polyjuice needs to evaluate the entire population. While this is happening, the performance of the system may stutter and jump back and forth as it tries different policies, which is not ideal in production workloads. At the same time, we must use production workloads to train the policy. And of course, the process requires multiple iterations.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle

Our 72nd paper was on avoiding coordination as much as possible. We looked at the “Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle” EuroSys’20 paper by Adriana Szekeres, Michael Whittaker, Jialin Li, Naveen Kr. Sharma, Arvind Krishnamurthy, Dan R. K. Ports, Irene Zhang. As the name suggests, this paper discusses coordination-free distributed transaction execution. In short, the idea is simple — if two transactions do not conflict, then we need to execute them without any kind of coordination. And the authors really mean it when they say “any kind.”

In distributed transaction processing, we often think about avoiding excessive contention/coordination between distributed components. If two transactions are independent, we want to run them concurrently without any locks. Some systems, such as Calvin, require coordination between all transactions by relying on an ordering service to avoid expensive locks. Meerkat’s approach is way more optimistic and tries to avoid any coordination, including ordering coordination. Meerkat is based on Optimistic Concurrency Control (OCC) with timestamp ordering to enable independent transactions to run without locks and ordering services. Another innovation for avoiding coordination is the replication of transactions from clients straight to servers. Instead of relying on some centralized replication scheme, like Multi-Paxos or Raft, Meerkat lets clients directly write transactions to replicas, avoiding replication coordination and leader bottlenecks. The authors explored this “unordered replication” idea in their previous paper. The clients also act as the transaction coordinators in the common case.

Meerkat does not stop its coordination avoidance efforts here at the cross-replica coordination. In fact, this is where the most interesting magic starts to happen — the authors designed a good chunk of a system specifically to avoid the cross-core coordination within each server to take advantage of modern multi-core CPUs. The authors call such avoidance of cross-core and cross-replica coordination a Zero Coordination Principle or ZCP for short.

As a motivating example for ZCP and the need for cross-core coordination avoidance, the paper illustrates the contention created by a simple counter shared between threads on one machine. It appears that with the help of modern technologies to alleviate networking bottlenecks (kernel bypass), such a shared counter becomes an issue at just 8 threads. In the example, a simple datastore with a shared counter could not scale past 16 threads, while a similar store without a shared resource had no such problems.

Let’s talk about the protocol now to see how all the coordination-avoidance efforts actually work. The system tolerates $$f$$ failures in the cluster of $$2f+1$$ machines. Each transaction can read and write some set of objects supported by the underlying key-value store. These objects represent the transaction’s read- and write-sets. The replicas maintain two data structures to support transaction processing: a trecord and a vstore.

The trecord is a table containing all transaction information partitioned by the CPU core ID to make each transaction “sticky” to a single core. It manages the transaction state, such as the read- and write-sets, transaction version timestamp, and commit status. The vstore stores versioned key-value pairs. Unlike the trecord, the vstore is shared among all cores at the server. The transaction protocol runs in 3 distinct phases: Execute, Verify, Write. I’m not sure these are the most intuitive names for the phases, but that will do.

In phase-1 (execute), the transaction coordinator (i.e., a client) contacts any replica and reads the keys in its read-set. The replica return versioned values for each key. The coordinator then buffers any pending writes.

The phase-2 (validation) combines the transaction commit protocol with replication of transaction outcome. The coordinator starts phase-2 by first selecting a sticky CPU core that will process the transaction. The sticky core ID ties each transaction to a particular CPU core to reduce inter-node coordination. The coordinator then creates a unique transaction id and a unique timestamp version to use for OCC checks. Finally, the coordinator sends all this transaction information to every replica in a validate message.

Upon receiving the message, each replica creates an entry in its trecord. The entry maintains the transaction’s state and makes this transaction “stick” to the core associated with the core-partitioned trecord. At this point, the replica can validate the transaction using OCC. I will leave the details to the paper, but this is a somewhat standard OCC check. It ensures that both the data read in phase-1 is still current and the data in the write-set has not been replaced yet by a newer transaction. At the end of the check, the replica replies to the coordinator with its local state (OK or ABORT).

The coordinator waits for a supermajority ($$f+\lceil\frac{f}{2}\rceil+1$$) fast-quorum of replies. If it receives enough matching replies, then the transaction can finish right away in the fast path. If the supermajority was in the “OK” state, the transaction commits, and otherwise, it aborts.

Sometimes a supermajority fast-quorum does not exist or does not have matching states, forcing the coordinator into a slow path. In a slow path, the coordinator only needs a majority of replicas to actually reply. If the majority has replied with an “OK” state, the coordinator can prescribe the replicas to accept the transaction, and otherwise, it prescribes the abort action. Once the replicas receive the prescribed transaction state, they mark the transaction accordingly in their trecord and reply to the coordinator. Here, the coordinator again waits for a quorum before finalizing the transaction to commit or abort.

Finally, in phase-3 (write), replicas mark the transaction as committed or aborted. If the transaction is committed, then each replica can apply the writes against the versioned datastore.

Phew, there is a lot to unpack here before diving deeper into the corner cases and things like replica and coordinator failures and recovery. The important parts relate to how the coordination is handled/avoided. To start, the coordinator uses a timestamp for the version, circumventing the need for a counter or centralized sequencer. The transaction replicates with its timestamp directly by the coordinator (who happens to be the client in the normal case) to the replicas, avoiding the need for a centralized replication leader or primary. At the server level, each transaction never changes its execution core even as it goes through different phases. All messages get routed to the core assigned to the transaction, and that core has unique access to the transaction’s trecord partition. This “core stickiness” avoids coordination between the cores of a server(!) for the same transaction. I speculate here a bit, but this may also be good for cache use, especially if designing individual transaction records to fit in a cache line. As a result, the only place the coordination happens between transactions is the OCC validation. During the validation, we must fetch current versions of objects from the core-shared vstore, creating the possibility for cross-core contention between transactions accessing the same data.

I do not want to go too deep into the failure recovery; however, there are a few important points to mention. The replica recovery process assumes replicas rejoin with no prior state, so they are in-memory replicas. The recovery process is leader-full, so we are coordinating a lot here. And finally, the recovery leader halts transaction processing in the cluster. As a result, the recovery of one replica blocks the entire system as it needs to reconcile one global state of the trecord that can be pushed to all replicas in a new epoch. I will leave the details of the recovery procedure to the paper. However, intuitively, this over-coordination in recovery is needed because the normal operation avoided the coordination in the first place. For example, state machine replication protocols “pre-coordinate” the order of all operations. When a replica needs to recover, it can learn the current term to avoid double voting before simply grabbing all committed items from the log available at other nodes and replaying them. It can replay the recovered log while also receiving new updates in the proper order. In Meerkat, we have no single history or log to recover, so a pause to reconcile a consistent state may be needed.

The coordinator recovery is handled by keeping backup coordinators and using Paxos-like consensus protocol to ensure only one coordinator is active at the same time and that the active coordinator is in a proper state.

Now we can talk a bit about the performance. Meerkat significantly outperforms TAPIR, which is the previous system from the same group. The performance gap between the systems is huge. This begs a natural question about the performance. Just how much of Meerkat’s gain is due to a super-optimized implementation and utilization of techniques like kernel-bypass? Meerkat-PB in the figure can shed some light on this, as it represents a version of Meerkat with a dedicated primary for clients to submit transactions. Having a primary adds cross-replica coordination, and despite that, it still significantly outperforms the older systems.

As always, we had a presentation in the group, and it is available in our YouTube channel. This time, Akash Mishra did the presentation:

## Discussion

Quite frankly, I have incorporated quite a bit of discussion into the summary already.

1) Performance. One of the bigger questions was about performance. How much of the “raw” speed is due to the ZCP, and how much of it is due to the enormous implementation expertise and use of kernel bypass and fancy NIC to deliver messages to proper cores/threads. We speculate that a lot of the overall performance is due to these other improvements and not ZCP. That being said, once you have an implementation this efficient/fast, even tiny bits of coordination start to hurt substantially, as evidenced by the motivation examples and even the primary-backup version of Meerkat.

2) Replica Recovery. The blocking nature of recovery may present a real problem in production systems, especially if the recovery time is substantial. It would have been nice to see the recovery evaluations.

3) Performance States. To continue with performance/recovery topics, it appears as the system can operate in multiple very distinct performance states. In the fast-quorum operation, commits come quickly. In the slow path, a whole new round-trip exchange is added (probably after the timeout). This creates distinct latency profiles for fast and slow paths. This can also create distinct throughput profiles, as a slow path sends and receives more messages, potentially creating more load in the system.

4) Sharded systems? Many systems use sharding to isolate coordination into smaller buckets of nodes or replica-sets. For example, Spanner and Cockroach DB do that. Such sharding allows independent transactions to run in separate “coordination pods” without interfering with each other. To scale these systems just need to create more such coordination pods. Of course, in systems like Spanner, transactions that span multiple replica-sets add another level of coordination on top, but the chance that any two shards need to coordinate is kept low by making lots of tiny shards. I wonder about the differences between two philosophies — avoiding coordination vs. embracing it, but in small groups. Are there benefits to “coordination pods”? Should we embrace ZCP? Can ZCP survive the scale of these larger sharded systems?

5) The need for supermajority quorum? Supermajority fast quorums often raise many questions. Just exactly why do we need them? The short answer here is fault-tolerance, or more specifically the ability to recover operations after failures. See, in majority quorum protocols that have a leader, we have at most one operation that can be attempted in the given epoch and log position. This means that if some replicas fail, we can recover the operation if we can guarantee to see the value in at least one replica. Any two majority quorums intersect in at least one replica, making the single operation recoverable as long as it has made it to the majority. Unfortunately, this does not work with leaderless solutions as illustrated by Fast Paxos, as many different values can be attempted in the same epoch and slot position. However, we still need to survive the failures and recover.

Let’s look at the example to illustrate this. Assume we have 5 nodes, 3 of which have accepted value “A” and 2 have value “B.” Let’s assume that we commit “A” at this time since we clearly have a majority agreeing on “A.” If 2 “A” nodes crash, we will have 3 live nodes remaining: “A, B, B.” By looking at these nodes we do not know the fact that value “A” might have been committed by the coordinator. We need a supermajority quorum to survive the failures and recover. If we commit with a supermajority of size $$f+\lceil\frac{f}{2}\rceil+1$$ (4 out of 5 nodes) “A, A, A, A, B”, then failing 2 “A” nodes leaves us with 3 nodes: “A, A, B.” We see more “A” operations here and can recover “A.” In fact, if some value is in a supermajority, then any majority quorum will have the majority of its nodes (i.e. $$\lceil\frac{f}{2}\rceil+1$$ nodes) having that value (but not the vice-versa — the majority value of a majority quorum does not mean the value was in the supermajority).

Now, how does this relate to Meerkat? Each Meerkat transaction has a unique id, so one may think we never have the possibility of committing two or more different transactions for the same id. However, we have to be careful about what Meerkat replicas need to agree upon. It is not the transaction itself, but a transaction status — OK or ABORT. So, we do have two possible values that can exist at different replicas for the same transaction. As a result, Meerkat needs a fast path supermajority quorum to make the transaction status decision recoverable in the replica recovery protocol.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. FoundationDB: A Distributed Unbundled Transactional Key Value Store

Last week we discussed the “FoundationDB: A Distributed Unbundled Transactional Key Value Store” SIGMOD’21 paper. We had a rather detailed presentation by Moustafa Maher.

FoundationDB is a transactional distributed key-value store meant to serve as the “foundation” or lower layer for more comprehensive solutions. FoundationDB supports point and ranged access to keys. This is a common and decently flexible API to allow building more sophisticated data interfaces on top of it.

FoundationDB is distributed and sharded, so the bigger part of the system is transaction management. The system has a clear separation between a Paxos-based control plane and the data plane. The control plane is essentially a configuration box to manage the data plane. On the data plane, we have a transaction system, log system, and storage system. The storage system is the simplest component, representing sharded storage. Each node is backed by a persistent storage engine and an in-memory buffer to keep 5 seconds of past data for MVCC purposes. The storage layer is supported by sharded log servers that maintain the sequence of updates storage servers must apply.

The interesting part is the transaction system (TS) and how clients interact with all the components on the data plane. The client may run transactions that read and/or update the state of the system. It does so with some help from the transaction system, which also orchestrates the transaction commit. When a client reads some data in a transaction, it will go to the transaction system and request a read timestamp or version. On the TS side, one of the proxies will pick up the client’s request, contact the sequencer to obtain the version and return it to the client. This version timestamp is the latest committed version known to the sequencer to guarantee recency. Thanks to MVCC, the client can then reach out directly to the storage servers and retrieve the data at the corresponding version. Of course, the client may need to consult the system to learn which nodes are responsible for storing particular keys/shards, but the sharding info does not change often and can be cached.

Writes/updates and transaction commit procedure are driven by the TS. The client submits the write operations and the read-set to the proxy, and the proxy will attempt to commit and either return an ack or an abort message. To commit, the proxy again uses the sequencer to obtain a commit version higher than any of the previous read and commit versions. The proxy will then send the read and write set along with the versions to the conflict resolver component. The resolver detects the conflicts; if no conflict is detected, the transaction can proceed, otherwise aborted. Successful transactions proceed to persist to the log servers and will commit once all responsible log servers commit. At this point, the sequencer is updated with the latest committed version so it can continue issuing correct timestamps. Each transaction must complete well within the 5 seconds of the MVCC in-memory window. Needless to say, read-only transactions do no go through the write portion of the transaction path since they do not update any data, making reads low-weight.

The failure handling and recovery is an important point in any distributed system. FoundationDB takes a fail-fast approach that may at times sound a bit drastic. The main premise of failure handling on the transaction system is to rebuild the entire transaction system quickly instead of trying to mask failure or recover individual components. The committed but not executed transactions can be recovered from the log servers and persisted to storage, in-progress transactions that have not made it to the log servers are effectively timed out and aborted. Transactions that partly made it to the log servers are also aborted, and new log servers are built from a safe point to not include the partial transactions. Here I just scratched the surface on the recovery, and the paper (and our group’s presentation) is way more accurate and detailed.

Another important point in the paper is the testing and development of FoundationDB. The paper talks about simulator testing. In a sense, the simulator is an isolated environment for development and testing the full stack on just one machine. It comes with a handful of mock components, such as networking and a clock. All sources of non-determinism must be mocked and made deterministic for reproducibility. The paper claims that the simulator is very useful for catching all kinds of bugs with a few exceptions, such as performance bugs.

## Discussion.

1) Flexibility of FoundationDB. Our previous paper was on RocksDB, a key-value single server store. It is meant as the building block for more complex systems and applications. This is very similar in spirit to FoundationDB that is meant as a “foundation” for many more complex systems. However, FoundationDB is way more complex, as it implements the data distribution/replication and transactions. This can potentially limit the use cases for FoundationDB, but obviously, this is done by design. With replication and transactions are taken care of, it may be easier to build higher-up levels of the software stack.

2) Use cases. So what are the use cases of FoundationDB then? It is used extensively at Apple. Snowflake drives its metadata management through FoundationDB. In general, it seems like use cases are shaped by the design and limitations. For example, a 5-seconds MVCC buffer precludes very long-running transactions. The limit on key and value size constrains the system from storing large blobs of data. Arguably, these are rather rare use cases for a database. One limitation is of particular interest for me, and this is geo-replication.

3) Geo-replication. The paper touches on geo-replication a bit, but it seems like FoundationDB uses geo-replication mainly for disaster tolerance. The culprit here is the sequencer. It is a single machine and this means that geo-transactions have to cross the WAN boundary at least a few times to get the timestamps for transactions. This increases the latency. In addition to simply slower transactions, numerous WAN RTT to sequencers can push the transaction time closer to the 5-second limit. So it is reasonable to assume that the system is not designed for planetary-scale deployment.

4) Simulator. We discussed the simulator quite extensively since it is a cool tool to have. One point raised was how a simulator is different from just setting up some testing/development local environment. The big plus for a simulator is its ability to control determinism and control fault injections in various components. There are systems like Jepsen to do fault injection and test certain aspects of operation, but these tend to have more specific use cases. Another simulator question was regarding the development of the simulator. It appears that the simulator was developed first, and the database was essentially build using the simulator environment.

We were also curious about the possibility of a simulator to capture error traces or do checking similar to systems like Stateright. It appears, however, that this is outside of the simulator capabilities, and it cannot capture specific execution traces or replay them. It is capable of controlling non-deterministic choices done in mock components, making a failure easier to reproduce. One somewhat related point mentioned was eidetic systems that remember all non-deterministic choices made in the OS along with all inputs to be able to replay past execution, but this seems like an overkill to try to build into a simulator.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Unifying Timestamp with Transaction Ordering for MVCC with Decentralized Scalar Timestamp

Unlike many of my recent summarier, I will mskr this one short, I promise. “Unifying Timestamp with Transaction Ordering for MVCC with Decentralized Scalar Timestamp” NSDI’21 paper proposes a mechanism to order transactions in multi-version distributed data-stores. One of the problems with distributed transactions is the ordering required to achieve consistency. In particular, we often want to have some serial order of transactions to have an illusion that they could have executed one by one. This is hard to do in a scalable manner. One approach is to try to rely on the physical clocks of the machines, but this is unreliable due to clock skew and clock synchronization issues. Clock skew can introduce causality violations. For example, if transaction TXa happened-before TXb, but due to clock skew, transaction TXa got a larger timestamp than TXb, then we have a causality problem — the cause and effect are reversed if we follow the timestamp order. One way to avoid such causality issues is to rely on a centralized oracle to prescribe the transaction order. Quite a few systems do that, but for obvious reasons, a centralized approach may create scalability and reliability problems. There are a few other ordering mechanisms, such as vector clocks/version vectors and hybrid time.

The authors of the paper take the hybrid time approach that they call Decentralized Scalar Timestamp (DST). DST is a single number that represents the progression of history. It is decentralized, thus avoiding the problems with a single timestamp oracle, and it is smart enough to avoid causality problems. The authors marry the timestamp generation/progression with the concurrency control (CC) mechanism, such as 2PL, allowing CC to adjust the timestamp to match the execution order. Consider two write-conflicting transactions TXa and TXb. Both transactions have some initial timestamps ta and tb respectively. These initial timestamps are based on the timestamps of the last transaction known to the coordianator or client (and ultiamtely based on the lossely sycnhrnonized physical time). And then the authors propose to use CC to bump up the timestamps as needed to ensure that the timestamps follow the execution order managed by the concurrency control mechanism. So for example, if initially ta < tb, but TXb happened-before TXa, then we bump ta := tb + 1. The actual “bumping-up” is a bit more complicated since the time is stored as two components – physical time and logical one, but the result is that the new timestamps are ordered the same way as the transaction execution.

Since the system operates against a multi-version store, the versions from both transactions are preserved. This is important for performing reads that are based on consistent snapshots, so the latest state may advance forward, and a multi-version store ensures that the snapshot remains available for reads. The read-only transactions (ROTs) bypass some concurrency control and try to avoid locking in the paper. Read-only transactions, however, still need to ensure isolation, as it would be unacceptable to see a result of a partial write. To that order, ROTs have to actively write their read version on all objects and wait for locked objects to get unlocked before reading. This enables the consistent cut read and allows new writes to proceed without blocking but with a higher version than the read.

As far as evaluation, the paper presents multiple different environments and a few different benchmarks along with a pretty good breakdown of comparisons between different approaches.

We have had our own presentation by David Correa, the recording is available on the reading group’s YouTube:

## Discussion.

1) HLC. The biggest discussion topic for this paper was a relation to the Hybrid Logical Clock (HLC). See, HLC is a well-known approach combining physical time and logical time. It keeps the affinity to physical time and maintains the causality just as Lamport’s logical clocks. The authors discuss that their DST approach is a combination of physical time and a logical one, just like HLC. However, the HLC work is never cited. It seems like the internal operation of the timestamp/clock is very similar. Moreover, there are well-described transaction protocols relying on HLC and MVCC described in the literature in great detail. It would be very interesting to see more about the DST clock operation and see its comparison with HLC. Similarly, it would be interesting to compare with transactions scheme by CockroachDB or YugabyteDB.

One bigger difference from typical HLC implementation is that DST increments/ticks only at significant events, such as transaction execution. HLC often tick at message passing in addition to significant events. This is a more general way to ensure clocks causally update on each communication, however, this is not a requirement for protocols with simple communication exchange patterns. In fact, Mongo updates HLC only at significant events.

2) Performance of Read-only Transactions. ROTs require a write of a timestamp on each object touched by the transaction. This is important for safety to make sure all new writes are ordered after the read-only transaction, so this version update of an object must be durable. We think that this may have a negative impact on performance, as each read includes a disk write in its path.

3) Evaluation. The group had some questions about the evaluation. In particular, the TPC-C benchmark is scaled with respect to districts. typical TPC-S benchmark tries a different number of warehouses, not districts. That being said, districts and warehouses are linked: “Each warehouse in the TPC- C model must supply ten sales districts.” This however raises additional questions, as 20 districts translate to just 2 warehouses, whereas many transactional papers go into 10s or even hundreds of warehouses.

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

# Reading Group. Exploiting Symbolic Execution to Accelerate Deterministic Databases

We have covered 60 papers in our reading group so far! The 60th paper we explored was “Exploiting Symbolic Execution to Accelerate Deterministic Databases” from ICDCS’20. I enjoyed the paper quite a lot, even though there are some claims I do not necessarily agree with.

The paper solves the problem of executing transactions in deterministic databases. We can image a replicated state machine, backed by Paxos or Raft. Trivially, in such a machine, each replica node needs to execute the transactions following the exact order prescribed by the leader to guarantee that all replicas progress through the same states of the machine. The good thing here is that nodes run each transaction independently of each other after the execution order has been established by the replication leader. The bad thing is that this naive approach is sequential, so each node cannot take advantage of multiple processing cores it may have.

Naturally, we want to parallelize the transaction execution. This, however, is easier said than done. To allow for more parallelism, we want to identify the situations when it is ok to run some transactions concurrently without impacting the final state of the state machine. For example, if we know what objects or keys the transaction reads and writes (i.e. the transaction’s read-and-write set), we can group independent transactions that operate on disjoint read-write sets together for parallel execution. For instance, a transaction accessing keys “x, y, and z” is independent of a transaction accessing keys “a, b, and c,” and the two can execute at the same time without impacting each other.

Of course, this requires us to know what objects/keys each transaction needs before running them, and this is a bit of a problem. In some situations, it may be easy to figure out the read and write sets of a transaction, but this is not always the case. Many systems, like Calvin, do a “pretend run” of a transaction (this is sometimes referred to as a reconnaissance transaction) to figure out the read-and-write set if the set isn’t obvious or annotated in the transaction. This has a few obvious downsides. Obviously, the pretend/reconnaissance phase uses the system’s resources. The reconnaissance run also increases the transaction’s latency. And finally, the reconnaissance is not perfect, and by the time of the “real” run, the read and write sets may have changed due to other transactions impacting the state of the system.

So, the above description is somewhat generic behavior for many systems out there. And this is where Prognosticator, a system discussed in the paper, comes in. Prognosticator uses Symbolic Execution (SE) to profile transactions and help predict each transaction’s read-and-write set. The system does not need experts to annotate transactions read-and-write sets, but it can still avoid the reconnaissance runs in many situations. Sometimes, however, the reconnaissance must still happen, but Prognosticator uses a few tricks to reduce the possibility of the reconnaissance becoming stale.

Let’s look at the issues with figuring out the read-write set of a transaction. Many transactions are not primitive read and write commands, and involve quite a bit of logic with loops and conditional statements. This means that a state of a client/application may impact both the values written and the keys accessed. For example, consider a transaction that takes some input i:

input i;
if i > 10 then write x:=i;
else write y:=i;

The read-and-write set of above example depends on the input value already known to the client. The paper calls this transaction an Independent Transaction (IT), as it does not have an internal dependence on the read values.

Some transactions can be a bit more complicated and have the read-write set depending on the value read by the transaction:

read a;
if a > 10 then write x:=a;
else write y:=a;

Here the transaction does not know its write set (i.e. writing x or y) until it acquires the value of a. Prognosticator paper refers to these transactions as Dependent Transactions (DT), as the write set has an internal dependence on the read values.

Obviously, for both types of transactions, we can do the reconnaissance phase to figure out all the logic and branching to learn all the required keys. But we do not really need full reconnaissance for the ITs, as their read-and-write sets only depend on some client input and not the transaction itself. In fact, we can just play out the transaction’s code to figure out the read-write set without actually retrieving any data from the store (i.e. using some dummy values). However, we still somehow need to know whether a transaction is IT, as using dummy values in DT will clearly not work. Moreover, such a “dry run” with dummy values for every IT we encounter is still wasteful, as we do it every time.

This is where Prognosticator’s Symbolic Execution (SE) approach shines. With SE, Prognosticator “unwraps” each transaction for all the possible execution branches, leading to a symbolic transaction solution for all possible code paths. If all code paths access the same keys then we have a static read-and-write set. If certain execution branches access different keys, but branching conditions involve only transaction input, then we are dealing with an Independent Transaction (IT). We can easily compute IT’s read-and-write set from the symbolic solution once the input is known. Finally, if SE yields some branches with different access keys, and these branches are conditioned on a transaction’s reads, then we have a Dependent Transaction (DT) and will require a reconnaissance read.

The Prognosticator “unwraps” each new transaction type only once at the client-side to create such a symbolic execution profile with all possible code branches. There are quite a few optimizations mentioned in the paper. The important gist of these optimizations is the fact that we do not care so much about the actual symbolic solutions, and care only about what keys show up in the read set and write sets. So if two execution branches produce different symbolic solutions, but access the same keys, these branches can be “merged” for the purposes of predicting the read-and-write set.

The rest of the Prognosticator’s magic depends on a batching technique that allows for a careful deterministic reordering of transactions. With the help of SE, the system identifies all read-only transactions (ROTs) and executes them concurrently at the beginning of the batch. This leaves us with a batch containing only ITs and DTs. The system then reorders DTs to the beginning of the batch. This allows it to run reconnaissance reads on all DTs while also working on ROTs. Since all DTs are now at the beginning of the batch, the reconnaissance reads cannot become stale due to any IT. Reconnaissance may still become stale due to the dependencies between DTs, and in this case, a DT is aborted during the execution phase and is placed in the abort batch to run after the main batch completes and before the next batch.

To execute the ITs and DTs in parallel, Prognosticator uses a lock table. The lock table is a collection of per-key queues, such that for every key in the batch there is an entry in the table with a queue of transactions. A transaction can be executed when it is at the head of the queues for all its keys. Obviously, executing a transaction removes it from all these queues.

The whole transaction execution process runs independently on each node. It is safe because we actually do not need to stick to the leader-prescribed ordered in the batch, as long as we keep the correct order of batches, and deterministically reorder the transactions in the same way on all replica nodes. With the batch execution, we have all the clients waiting for their transactions to finish, and this deterministic reordering does not cause any problems as all waiting transactions are concurrent. This is a common trick used in many systems.

The whole package with SE and batching provides a significant boost to the throughput compared to Calvin. It is not entirely clear though how much of the boost was enabled by the SE, but I will come back to this point in the discussion summary.

The paper goes into more detail on many aspects of the paper, including symbolic execution, more efficient implementation of the lock table, resource usage, etc. It was definitely a good read for me. As always, we had a presentation in our reading group, and I had to cover for a missing presenter:

## Discussion

1) Performance gains due to SE vs Batching. The paper claims a great speedup compared to Calvin, however, one question we had is just how much improvement is due to symbolic execution and how much of it is because of clever batching techniques. Let me elaborate. Some benefit comes from the careful reordering of operations. Running ROTs first helps a lot. Running reconnaissance reads at the node (compared to running a reconnaissance transaction at the client) is a lot faster too, and it reduces the possibility of reconnaissance becoming stale. Some of these techniques may be applicable in simpler systems without SE. For example, if we have transactions with annotated read-write sets, these reorderings within the batch become possible. Of course, SE definitely helps find ROTs and separate ITs from DTs to improve/enable the reordering within the batch without the need for annotated transactions.

2) Slow ROTs. Deterministic transactions are susceptible to clogged pipelines when some big long-running transaction gets in the way and delays consecutive transactions. Parallel/concurrent execution helps here by essentially having multiple processing pipelines. However, Prognosticator has one issue with the read-only transactions (ROTs), making them more susceptible to the clogging problem. The paper mentions that ROTs get executed at the beginning of the batch from a stable snapshot. All workers must complete their ROTs before the system moves to DTs and ITs to make sure that no DT or IT changes that stable snapshot while ROTs are still running. This means that there is a barrier at the end of the ROT phase, allowing a single long-running ROT to screw up the performance by delaying all DTs and ITs in the batch. However, this may be just an artifact of an academic prototype — taking a separate snapshot and running all ROTs from that snapshots should allow ROTS to execute concurrently with writes.

3) Cost of SE. The paper mentions the cost of doing symbolic execution. There are a few problems here. The first is processing time – more complicated transactions need a lot of time to pretty much exhaustively explore all branching. This also requires putting a limit on the number of allowed loop iterations. The limit can create a situation where a transaction is not fully profiled if it actually goes above the limit, requiring a reconnaissance. The limit, being a configurable parameter, may also require some expert knowledge of the workload to properly configure, and this is something the paper strives to avoid. Another big cost is the memory footprint of transaction profiles. The authors mention that the TPC-C benchmark required 960 MB for transaction profiles, which is not a small cost for a simple benchmark with relatively few transaction types. In the real world, the memory cost of having transaction profiles may be much higher.

4) Extension to sharded systems? Prognosticator works in the replicated system with all nodes storing identical data. It does not work in a sharded environment, yet most large-scale databases are sharded. It may be non-trivial to apply the same approach to sharded systems. After all, running a distributed transaction is harder, and may require some coordination between the shards. At the same time, it may still be possible to separate transactions according to their types and conflict domains with the help of SE to increase parallelism and make transaction execution more independent. Again, real systems based on Calvin’s ideas have cross-shard transactions. A big problem with sharded setup in Prognosticator involves DTs — the system expects to perform the reconnaissance read locally, which means that all data for the transaction must be available at each node. This is not possible in the sharded environment. And making reads non-local will make the system much closer to Calvin with a longer distributed reconnaissance phase and negative performance impact. So, the non-sharded nature of Prognosticator is a huge performance benefit when comparing with more general Calvin.