Tag Archives: consistency

Reading Group. UniStore: A fault-tolerant marriage of causal and strong consistency

For the 80th paper in the reading group, we picked “UniStore: A fault-tolerant marriage of causal and strong consistency” by Manuel Bravo, Alexey Gotsman, Borja de Régil, and Hengfeng Wei. This ATC’21 paper adapts the Partial Order-Restrictions consistency (PoR) into a transactional model. UniStore uses PoR to reduce coordination efforts and execute as many transactions as possible under the causal consistency model while resorting to strong consistency in cases that require ordering concurrent conflicting transactions. The PoR consistency itself is an extension of RedBlue consistency that allows mixing eventually consistent and strongly consistent operations. 

UniStore operates in a geo-replicated model, where each region/data center (I use region and data center interchangeably in this post) stores the entirety of the database. The regions are complete replicas of each other. Naturally, requiring strong consistency is expensive due to cross-region synchronization. Instead, UniStore allows the developers to choose running transactions as causally or strongly consistent. Causal consistency preserves the cause-and-effect notion between events — if some event e1 has resulted in event e2, then an observer seeing e2 must see the e1 as well. Naturally, if two events are concurrent, then they are not causally dependent. This independence gives us the freedom to apply these events (i.e., execute against the store) in any order. For instance, this approach provides good performance for events that may touch disjoint data concurrently. However, if these two events are concurrent and operate on the same data, then the events either need to be commutative or be partly ordered. UniStore does both of these — it implements CRDTs to ensure commutativity, but it also allows to declare a transaction as strongly consistent for cases that require ordering. As such, strong consistency becomes handy when the commutativity alone is not sufficient for the safety of the application. For example, when a transaction does a compare-and-set operation, the system must ensure that all replicas execute these compare-and-set operations in the same order.

So, in short, causal consistency is great when an application can execute complex logic in causal steps — event e1 completes, then observing the results of e1 can cause some other change e2, and so on. Strong consistency comes in handy when step-by-step non-atomic logic is not an option, and there is a need to ensure the execution order of conflicting concurrent operations. In both cases, the systems need to keep track of dependencies. For causal transactions, the dependencies are other transactions that have already finished and were made visible. For strongly consistent transactions, the dependencies also include other concurrent, conflicting, strongly consistent transactions.  

So, how does UniStore work? I actually do not want to get into the details too deep. It is a complicated paper (maybe unnecessarily complicated!), and I am not that smart to understand all of it. But I will try to get the gist of it. 

The system more-or-less runs a two-phase commit protocol with optimistic concurrency control. Causal transactions commit in the local data center before returning to the client. However, these causal transactions are not visible to other transactions (and hence other clients/users) just yet. Remember, this causal transaction has not been replicated to other data centers yet, and a single region failure can cause some problems. In fact, if a strongly consistent transaction somehow takes such a non-fully replicated causal transaction as a dependency, then the whole system can get stuck if the dependent causal transaction gets lost due to some minority regions failing. 

UniStore avoids these issues by making sure the causal transactions are replicated to enough regions before these causal transactions are made visible. This replication happens asynchronously in the background, sparing the cost of synchronization for non-strongly consistent transactions (that is, of course, if clients/users are ok with a remote possibility of losing transactions they thought were committed).

Strongly consistent transactions are a different beast. They still optimistically run in their local data centers but no longer commit in one region to ensure the ordering between other strongly consistent transactions. UniStore uses a two-phase commit here as well, but this time the commitment protocol goes across all healthy regions. First, the coordinator waits for enough data centers to be sufficiently up-to-date. This waiting is crucial for liveness; it ensures that no dependent transaction may get forgotten in the case of data center outages. After the waiting, the actual two-phase commit begins, with all (alive?) regions certifying the transaction. 

To implement this waiting and only expose the durable and geo-replicated state to transactions, UniStore has a complicated system of version tracking using a bunch of vector clocks and version vectors. Each of these vectors has a time component for each data center and an additional “strong” counter for keeping track of strongly consistent transactions. Each transaction has a couple of important version vectors. 

The snapshot vector snapVec describes the consistent snapshot against which the transaction runs. The commit vector commitVec tells the commit version of a transaction used for ordering. 

Each replica keeps two different version vectors representing the version of the most recent transaction known to itself and its data center. Since the system relies on FIFO order of communication and message handling, knowing the version of the most recent transaction implies the knowledge of all lower-versioned transactions as well. This information is then exchanged between data centers to compute yet another version vector to represent the latest transaction replicated to at least some majority of data centers. This, again, implies that all lower-versioned transactions have been replicated as well. This last version vector allows strongly consistent transactions to wait for their dependencies to become globally durable to ensure liveness. 

So here is where I lose understanding of the paper, so read on with a pinch of salt, as my skepticism may be completely unwarranted. It makes sense to me to use version vectors to keep track of progress and order causal transactions. Each region computes the region’s known progress, exchanges it with other regions, and calculates the global “transaction frontier” — all transactions that have been replicated to a sufficient number of data centers. This exchange of known progress between regions happens asynchronously. I am not entirely sure how these progress vectors help with ordering the conflicting transactions. Somehow the “strong” counter should help, but this counter seems to be based on the regions’ knowledge of progress and not the global one. I suspect that these vectors help identify the concurrent conflicting transactions. The progress known in the data center ends up in a snapVec and represents the snapshot on which the transaction operates. The strongly consistent transactions use a certification procedure (i.e., a two-phase commit) to decide whether to abort or commit. The paper mentions that the certification process assigns the commitVec, which actually prescribes the order. At this point, I hope that conflicting transactions are caught in this Paxos-based transaction certification procedure and ordered there or at least aborted as needed. Also worth mentioning that the extended technical report may have more details that I was lazy to follow through.

Now a few words about the evaluation. The authors focus on comparing UniStore against both casual and strongly consistent data stores. Naturally, it sits somewhere in the middle of these two extremes. My bigger concern with their implementation is how well it scales with the number of partitions and number of data centers. The paper provides both of these evaluations, but not nearly to the convincing scale. They go up to 5 data centers and up to 64 partitions. See, with all the vectors and tables of vectors whose size depends on the number of regions, UniStore may have some issues growing to “cloud-scale,” so it would be nice to see how it does at 10 data centers or even 20. Cloud vendors have many regions and even more data centers; Azure, for example, has 60+ regions with multiple availability zones.

Our groups persentation by Rohan Puri is on YouTube:

Discussion

1) Novelty. So the paper works with a rather interesting consistency model that combines weaker consistency with strong on a declarative per-operation basis. This model, of course, is not new, and the paper describes and even compares against some predecessors. So we had the question about the novelty of UniStore since it is not the first one to do this kind of consistency mix-and-match. It appears that the transactional nature of UniStore is what separates it from other such solutions. In fact, the bulk of the paper talks about the transactions and ensuring liveness in the face of data center outages, so this is nice. Many real-world databases are transactional, and having a system like this is a step closer to a practical solution.  

2) Liveness in the presence of data center failures. Quite a lot of the paper’s motivation goes around the inability to simply run OCC + 2PC in the PoR consistency model and maintain liveness. One problem occurs when a causal transaction takes a dependency on another transaction that did not make it to the majority of regions. If such a transaction is lost, it may stall the system. Of course, any region that takes a dependency on some transaction must see it first. Anyway, it is hard to see the novelty in “transaction forwarding” when pretty much any system recovers the partly replicated data by “forwarding” it from the nodes that have it. 

However, the bigger motivational issue is with strongly consistent transactions. See, the authors say that the system may lose liveness when a strongly consistent transaction commits with a dependency on a causal one that has not been sufficiently replicated, and that causal transaction gets lost in a void due to a region failure. However, to me, this seems paradoxical — how can all (healthy) regions accept a transaction without having all the dependencies first? It seems like a proper implementation of the commit protocol will abort when some parties cannot process the transaction due to the lack of dependencies. Anyway, this whole liveness thing is not real and appears to be just a way to make the problem look more serious. 

That said, I do think there is a major problem to be solved. Doing this more proper commit protocol may hurt the performance by either having a higher abort rate or replicating dependencies along with the strongly consistent transaction. We’d like to see how much better UniStore is compared to the simpler 2PC-based solution that actually aborts transactions when it cannot run them. 

3) Evaluation. The evaluation largely compares against itself, but in different modes — causal and strong. This is ok, but how about some other competition? Take a handful of other transactional approaches and see what happens? The current evaluation tells us that the PoR model provides better performance than strong and worse than causal consistency. But this was known before. Now we also know that this same behavior translates to a transactional world. But we do not know how the cost of the protocol fares against other transactional systems that do not have PoR and are not based on UniStore with features disabled.

Also interesting to see is how expressive the transactional PoR consistency model is. For example, let’s take MongoDB. It can be strongly consistent within a partition and causal across the partitions (and within the partition, users can manipulate the read and write consistency on a per-operation basis). What kind of applications can we have with Mongo’s simpler model? And what kind of apps need UniStore’s model with on-demand strong consistency across the partitions?

4) Complicated solution??? I have already mentioned this in the summary, but UniStore is complicated and relies on many moving parts. The paper completely omits the within-datacenter replication for “simplicity,” but it does not really make the paper simple. We have vectors that track progress, order, and snapshots, and then we have tables of vectors, and all these multiple kinds of vectors are exchanged back-and-forth to compute more vectors only to find out when it is ok to make some transactions visible or unblock some strongly consistent transactions. How come other systems (MongoDB again?) implement causal consistency with just one number for versioning (HLC) and still allow to specify stronger guarantees when needed? Yeah, they may not implement the PoR consistency model but it just seems too complicated. As a side question… what happens in Mongo when we start changing consistency between causal and strong on a per-request basis?

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Viewstamped Replication Revisited

Our 74th paper was a foundational one — we looked at Viestamped Replication protocol through the lens of the “Viewstamped Replication Revisited” paper. Joran Dirk Greef presented the protocol along with bits of his engineering experience using the protocol in practice.

Viestamped Replication (VR) solves the problem of state machine replication in a crash fault tolerance setting with up to f nodes failing in the cluster of 2f+1 machines. In fact, the protocol (between its two versions, the original and revisited) is super similar to Raft and Multi-Paxos. Of course, the original VR paper was published before both of these other solutions. An important aspect in the narrative of the VR paper is that the basic protocol makes no assumptions about persisting any data to disk. 

Replication

In the common case when replicating data, VR has a primary node that prescribes the operations and their order to the followers. The primary needs a majority quorum of followers to ack each operation to consider it committed. Once it reaches the quorum, the primary can tell the followers to commit as well. In VR, followers do not accept out-of-order commands and will not ack an operation without receiving its predecessor. With a few differences, this is how typical consensus-based replication works in both Raft in Multi-Paxos. For example, Multi-Paxos complicates things a bit by allowing out-of-order commits (but not execution to the state machine).

View Change and Replicas Recovery

As with everything in distributed systems, the interesting parts begin when things start to crumble and fail. Like other protocols, VR can mask follower crashes, so the real challenge is handling primary failures. The VR protocol orchestrates the primary failover with a view-change procedure. Here I will focus on the “revisited” version of the view-change. 

One interesting piece of the revisited view-change is its deterministic nature, as each view is assigned/computed to belong to a particular node. When a current primary in view fails, the node for view v+1 will try to become a new primary. This promotion to primary can be started by any node suspecting the current primary’s demise. We do not need to have a node for view v+1 to directly observe the crash of primary for v, allowing for unreliable failure detectors. 

So, any replica can start a view change if it detects a faulty primary. The replica will advance to the next view v+1 and start the view change by sending the StartViewChange message with the new view number to all the nodes. Each replica that receives the StartViewChange will compare the view to its own, and if its view is less than the one in the received StartViewChange message, it will also start the view change and send its own StartViewChange message to every replica. Also note, that two or more replicas may independently start sending the initial StartViewChange messages for the new view v+1 if they independently detect primary failure. The bottom line of this StartViewChange communication pattern is that now any replica that received at least f StartViewChange messages will know that the majority of the cluster is on board to advance with the view change. 

So, after receiving f StartViewChange messages for the new view v+1 from other nodes, each replica sends a DoViewChange message to the primary for that new view v+1. The DoViewChange message contains the replica’s last known view v’ along with the log. The new primary must receive a majority (i.e., f+1) of DoViewChange messages. The new primary then selects the most up-to-date log (i.e., the one with the highest v’ or longest log if multiple DoViewChange messages have the same highest v’). This up-to-date log ensures the new primary can recover any operations that might have been majority-accepted or committed in previous views. Such “learning from the followers and recover” part of the view change procedure is somewhat similar to Multi-Paxos, as the new leader needs to recover any missing commands before proceeding with the new ones. The VR leader recovery only cares about the view and the length of the log because it does not allow out-of-order prepares/commits. In Multi-Paxos, with its out-of-order commitment approach, we kind of need to recover individual log entries instead of the log itself. 

Anyway, after receiving enough (f+1DoViewChange messages and learning the most up-to-date log, the new primary can start the new view by sending the StartView message containing the new view v+1 and the recovered log. The followers will treat any uncommitted entries in the recovered log as “prepares” and ack the primary. After all entries have been recovered, the protocol can transition into the regular mode and start replicating new operations.

Schematic depiction of a View Change procedure initiated by one node observing primary failure. Other machines Send StartViewChange as a response. Eventually, a majority of nodes receive fStartViewChange messages and proceed to DoViewChange, at which point the new primary recovers the log.

There is a bit more stuff going on in the view change procedure. For example, VR ensures the idempotence of client requests. If some client request times out, the client can resend the operation to the cluster, and the new leader will either perform the operation if it has not been done before or return the outcome if the operation was successful at the old primary. 

Another important part of the paper’s recovery and view change discussion involves practical optimizations. For example, recovering a crashed in-memory replica requires learning an entire log, which can be slow. The proposed solution involves asynchronously persisting log and checkpoint to disk to optimize the recovery process. Similarly, view change requires log exchange, which is not practical. An easy solution would involve sending a small log suffix instead since it is likely that the new primary is mostly up-to-date. 

Discussion

1) Comparison With Other Protocols. I think the comparison topic is where we spent most of the discussion time. It is easy to draw parallels between Multi-Paxos and VR. Similarly, Raft and VR share many things in common, including the similarity of the original VR’s leader election to that of Raft. 

One aspect of the discussion is whether Multi-Paxos, Raft, and VR are really the same algorithms with slightly different “default” implementations. We can make an argument that all these protocols are the same thing. They operate in two phases (the leader election and replication), require quorum intersection between the phases, and have the leader commit operation first before the followers. The differences are how they elect a leader to ensure that the committed or majority-accepted operations from the prior leader do not get lost. Multi-Paxos and VR revisited recover the leader, while Raft ensures the new leader is up to date to eliminate the need for recovery. 

2) Optimizations. Academia and Industry have churned out quite a substantial number of optimization for Multi-Paxos and Raft. There has even been a study on whether the optimizations are interchangeable across protocols (the short answer is yes). But that study did not include VR. Interestingly, Joran mentioned that they applied Protocol-Aware-Recovery to VR, and (if I am not mistaken) they also used flexible quorums. Of course, I am not surprised here, given how similar these solutions are. 

3) In-memory Protocol. Joran stressed the importance of in-memory protocol description to engineers. In their systems, the in-memory consensus allows reducing the reliance on fault-tolerance/reliability of the storage subsystem. It appears that VR is the only major SMR protocol specified without the need for a durable infallible disk. And while people in academia do a lot of in-memory consensuses (including Multi-Paxos and Raft), the aspects of what is needed to make these systems work without the disk are not clearly described. 

The VR paper briefly describes the problem that the lack of durable log introduces. The issue is that a node can forget its entire state. So, it can accept some value, create a majority and then reboot. Upon rebooting, if the node is allowed to participate right away, we no longer have the forgotten value in the majority quorum. Similarly, a node can vote on something in one view, reboot and forget and then potentially revote in a lesser view by some slow old primary. 

To avoid these problems, the node needs to recover before it can participate. At least a partial recovery is needed to avoid the double voting problem — the node needs to learn the current view before accepting new operations. At the same time, such a partially recovered node must still appear “crashed” for any older operations until it has fully recovered. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Strong and Efficient Consistency with Consistency-Aware Durability

In the 62nd reading group session, we covered the “Strong and Efficient Consistency with Consistency-Aware Durability” paper from FAST’20. Jesse did an excellent presentation for the group that explains the core of the paper rather well:

This paper describes a problem with many leader-based replication protocols. It specifically focuses on ZooKeper and Zab, but similar issues arise in Multi-Paxos, and Raft systems as well. Linearizable reads are expensive, as they need to go through a lease-protected leader (or carry out a full consensus round in consensus-based replication if leases are to be avoided). This puts a single leader node in the middle of everything, which is bad for throughput. 

But what if we allow reading from followers? Systems like ZooKeeper allow such reads, but these have a few problems. For one, followers may be stale, so such reads do not have the same recency guarantees as the linearizable solutions. Secondly, reading from followers violates monotonic reads if the client is to jump between the followers it reads from without having a special session/recency token. The third problem, the one the paper is specifically solving, is that there is no monotonicity between clients — when one client reads a value of version n, nothing prevents another client at a later time to read version n-1 from a different node. 

So, the goal of the paper is to allow reading from followers while sacrificing as little consistency as possible. The system introduced in the paper, called ORCA, solves two consistency-related problems. First, it ensures that local followers return to clients only durable data that cannot be lost in the face of failures. Second, ORCA enforces some recency guarantees in the form of cross-client monotonic reads.

To achieve both goals, the system introduces a globally replicated durable-index. The purpose of this index is to let the nodes know up to which point in the log the data has been durably (here durability means persistence to some non-volatile storage) replicated to some quorum of nodes (traditionally, this would be a majority quorum). This durability marker allows the followers to avoid returning a value that can be lost due to a crash. For example, consider some object k with the initial value k1 and a corresponding durable-index=1. Another value k2 is being replicated in log position 2. If a local node returns k2 before it is durable, there is a chance that k2 was not replicated and persisted at the majority of nodes yet, and a minority partition or failure can erase k2. To prevent this, the leader maintains a durable-index that gets updated once the quorum of replicas acknowledged the persistence of data to storage. In the example, once the k2 is durable and followers acked, the leader updates its durable-index:=2, and sends it to the followers. A follower can only return a value k2 if it has received a durable-index>=2 (i.e the index that has moved to or past the value k2). Naturally, there are plenty of situations when the follower may be aware of k2 but has not received the durable-index=2 update just yet. In this case, the follower cannot return the data just yet. It cannot return k2, since it is not durable yet, but it also cannot return k1, as some other follower may have already returned k2 to a client. One solution would be to wait out and see whether the durable-index eventually catches up. A more proactive approach would forward the read to the leader, and let the leader synchronously replicate k2 (and tell followers to flush it to disk) before returning the value to the client. The paper favors the proactive approach, as it acts somewhat as “read-repair” applicable to a variety of leader-based replication schemes. 

The paper presents this durable-index as a big contribution, although I am a bit skeptical about this. See, existing systems have been doing the whole durable-index business for a very long time. I will focus on just one example, however — Raft. In Raft, the leader must communicate the leader’s commitIndex with the followers after a quorum has been reached so that all nodes can mark all log items or updates up to commitIndex as committed. If we assume that Raft persists operations to disk at each node upon receiving them, then the leader’s commitIndex in the “commit” message (i.e next AppendEntry RPC) is the same as durable-index from this paper. A similar global commit progress marker can be implemented in Paxos-style systems despite a few challenges with out-of-order commits in Multi-Paxos. 

For paper’s defense, it makes the durable-index a bit more general than Raft’s approach. For example, the paper allows for more relaxed primary backup implementations, asynchronous replication, and implementations with delayed disk persistence. Consider adding durable-index to an asynchronous leader-driven system to allow durable reads. The system can check whether the value to be returned is durable, allowing it to do synchronous “read-repair” only when needed, and preserving the benefits of async replication as often as possible.

Ok, so we have a durable-index that marks the global progress of the system, and the leader distributes the durable-index to all nodes. This is enough to ensure the data read once won’t ever get lost due to a failure, but this is still not enough to ensure monotonicity. If one server in the cluster of 3 is slow, it may not have received some updates and the new durable-index as 2 other faster nodes have successfully applied some more operations. This can lead to the slow node returning older data than the other two nodes may have already returned. Similar can happen when one replica is network partitioned. For a single client, this monotonicity problem can be solved with some sort of session token to make recency of data observed by the client, but there is no elegant solution for the cross-client case. 

The naive solution to the monotonicity problem, which Jesse describes in great detail in the presentation, expands the durable-index requirement to all nodes. In this case, a follower can only return a value when its state matches the durable state agreed upon by all nodes, so there can be no slow or partitioned replicas. For instance, we may have 3 nodes, all having a persisted history of k1, k2, k3. Since all nodes have the value k3, then it is clearly durable. A leader node can then send the durable-index=3 to signal the rest of the cluster that a 3rd item (i.e. k3) is durable. Let’s say one follower f1 has received this durable-index. We can allow this replica to return k3 locally. At the same time, other follower f2 may not have received the durable-index update just yet, so it is aware of k3 but does not think it is durable. The node f2 won’t return k2 or k3 and instead will forward the read to the leader, which will ensure that k3 is durable and no other operation is in progress before returning k3.

Requiring durability/replication to all nodes has been explored before. If I continue to draw parallels with Raft and take into account the flexible quorums result, then configuring Raft with a replication phase requiring a quorum of all nodes will achieve this durable-index of all nodes approach. Needless to say that Raft will provide linearizability and not just monotonic reads (as long as followers block in case of a dirty read until they receive updated commitIndex from the leader). There is an obvious problem with the quorum of all nodes crippling the availability when one node goes down, however, there are also examples of production systems that make this work to some degree.

Anyway, ORCA does not require durability on all nodes due to availability issues. Instead, it introduces the “active-set,” a set of nodes that must achieve durability for the durable-index to progress. Reads are possible against the replicas in the active-set, while reads against nodes outside of the active-set are prohibited (forwarded to leader?). The active-set must be at least the majority of nodes for fault tolerance reasons. The lease protects the active-set to allow failed or partitioned nodes to fall out of the set safely. This idea is largely described in Paxos Quorum Leases paper, which, again, provides linearizability. 

Discussion

1) Linearizable Alternatives

Flexible Quorums in Write All Read One configuration. I already touched on this point in the summary, but I also raised it in our group’s discussion. One thing that is different between just running Raft with write quorum of all nodes and ORCA is that ORCA can do “fast writes” since it is not doing linearizability. These fast writes are just uncommitted writes when clients receive a write acknowledgment before an operation has been quorum-committed. Some systems, like MongoDB, also have them, and they seem to cause more confusion and problems than benefits. So, we do not think this is really a good feature to have in a system with stronger consistency guarantees. In fact, this consistency relaxation seems to be the only big difference between Flexible-Paxos (Raft) approach and the naive ZooKeeper-based ORCA. 

Paxos Quorum Leases. Paxos Quorum Leases (PQL) paper was brought up during the discussion too. At the high level, PQL is similar to lease-protected active-sets of ORCA. However, we were surprised to see a complete lack of comparison between ORCA’s active-set approach and PQL. 

Paxos Quorum Read. The problem of scaling read throughput in strongly consistent systems has been explored by me before. Paxos Quorum Read (PQR) approach allows client-driven reads from a quorum of followers. This is not as scalable as reading from just one node, but PQR avoids any leases and sticks to linearizability. 

2) Safety of durable-index

Lease safety. As with all systems that rely on leases, proper timeouts are essential for safety. Lots of care must be put into establishing proper timeouts. For example, it is essential to have partitioned nodes in active-set to timeout and “kill themselves” before the leader has a chance to timeout and pick a new active-set. If a leader creates a new active-set before the bad node in the old one dies, we may have a situation with two nodes returning different data. Timeouts also need properly working clocks. If one clock ticks too quickly or too slowly, this may also be a problem. 

Replication safety. An interesting point was brought up regarding the safety of durable-index. Since the durable-index update may get delivered faster to some nodes and slower (or be dropped altogether) to others, this creates a situation when a client can read from a more up-to-date node and receive some newer data, let’s say value k3. A client may then contact a different node that has not received the new durable-index and still thinks k2 is durable. This node cannot return k2as it would be a violation of monotonic reads! The node must wait for k3 to become durable, which in ORCA means that the read gets forwarded to the leader. So in other words, if a node knows of some non-durable update to an object, it is prettymuch blocked, and needs to forward to the leader to avoid safety problems. This leads to the next discussion point.

3) Performance cost. The safety of the protocol relies on fixing the durability while doing a read operation. So every time a read is done against the node that knows of some newer update that is not durable yet, the protocol forwards to the leader and potentially re-replicates. Even the forwarding part alone can be costly. It will cost not only extra latency but also the resources needed to support these multi-hop operations. 

ORCA is banking on the fact that we have a system with multiple keys/objects and that the access patterns are such that it is unlikely for us to both have an outstanding non-durable update and a read operation for the same key at the same time. This is generally an ok assumption, and many systems do that. The problem is that the opposite case is a complete disaster for the system. Having just a handful of “hot keys” will cause frequent forwards to the leader and all the extra work and extra latency. At the very best this will create bad latency, but because this creates a lot more work for the system, it actually lowers the maximum throughput achievable in the system. Can this translate to a metastable failure?

4) Practicality of cross-client monotonic reads. The final discussion topic I want to mention is the practicality of such cross-client monotonic read consistency. The paper goes into a few examples. However, monotonic reads are a far cry from more stringent consistency models. ORCA does not explicitly enforce read-your-write property, which, intuitively, is desirable in a system claiming strong consistency. For instance, Facebook goes out of its way to provide read-your-write consistency across its stack of weakly consistent systems to make development easier and prevent user confusion. 

As a last remark, I want to add that consistency properties with the paper’s approach will depend on the underlying leader-based protocol and implementation. As I mentioned a few times, Raft already has durable-index backed in, and solutions like PQL can provide something similar to the active-set described here for a completely linearizable system. However, the durable-index and active-set approach can be applied to weaker-consistency leader-based replication, and in such cases, this can improve consistency, hopefully at a low-enough cost. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Conflict-free Replicated Data Types

We kicked off a new set of papers in the reading group with some fundamental reading – “Conflict-free Replicated Data Types.” Although not very old (and not the first one to suggest something similar to CRDTs), the paper we discussed presents a proper definition of Conflict-free Replicated Data Types (CRDTs) and the consistency framework around them. Needless to say, lots of research followed after this paper in the area of CRDTs. 

It is impossible to discuss Conflict-free Replicated Data Types without mentioning the consistency in distributed replicated systems a bit. In super high-level terms, consistency describes how well a replicated system mimics a single copy illusion of data. On one side of the consistency spectrum, we have strong consistency (i.e. linearizability), that appears to an outside observer as if there is exactly one copy of the data. On the other side of the spectrum, we have eventual consistency, which allows many kinds of data artifacts, such as reading uncommitted data, accessing stale data, and more. 

Strong consistency, however, comes at a significant performance cost that eventual systems do not have. For strong consistency, the order of operations is crucial, as clients must observe a single history of state changes in the system. Most often this means that all replicas must apply the same sequence of commands in the same order to progress through the same states of their state machines. This requires a sequencer node which often is a bottleneck. There are a few exceptions to this, for example, protocols like ABD do not need to have a single sequencer, and there may be even gaps/variations in history on individual nodes, but these protocols have other severe limitations. In addition to following the same history of operation, linearizability also imposes strict recency requirements — clients must observe the most recent state of the system. This prescribes synchronous replication to make sure enough nodes progress in lock-step for fault tolerance reasons. These challenges limit scalability — despite having multiple replicas, a replicated linearizable system will be slower than a single server it tries to mimic. 

Eventually-consistent systems do not have such strong performance constraints, because there is no need to order operations, enforce recency, and even keep a single history of updates. This gives a lot of freedom to explore parallelism and push the boundaries of performance. Unfortunately, these systems are hard to program against, since the application built on top of an eventual consistency store needs to account/anticipate all kinds of data artifacts and deal with them. 

All these differences between strong and eventual consistency also mean that they land on different sides (vertices?) of the CAP triangle. With the recency requirements and lock-step execution, linearizable systems are CP, meaning that they sacrifice the Availability in the face of network Partitions and remain Consistent. Eventual systems… well, they do not promise Consistency at all, so they remain Available.  

Anyway, the drastic differences between the two extremes of the consistency spectrum coupled with the scary CAP theorem have sparked a lot of research in consistency models that lie between strong and eventual. These intermediate models were supposed to provide a compromise between the safety of strongly consistent systems and the performance/availability of eventual ones. This is where CRDTs come to play, as they often drive the Strong Eventual Consistency (SEC) model. The paper presents SEC as the “solution to CAP”, and this makes me cringe a bit. First of all, Strong Eventual Consistency is a strongly confusing name. Secondly, having a solution to CAP sounds super definitive, whereas SEC is merely one of many compromises developed over the years. 

Now we are getting to the meat of the paper that excites me. See, aside from a cringy name and a claim to solve CAP problems, SEC is pretty clever. A big problem with eventual consistency is that it does not define any convergence rules. Without such rules, the system may converge to an arbitrary state. Moreover, the convergence itself becomes unpredictable and impossible to reason about. SEC addresses the convergence problem by imposing some rules to the eventual consistency model. This enables engineers to reason about both the intermediate and final states of the system.

More specifically, SEC calls that any two identical nodes applying the same set of operations will arrive at identical states. Recall, that this sounds similar to how strongly consistent systems apply operations at nodes. The difference is that in strongly consistent systems we reason about sequences that have some order to them. In SEC, we work with sets of commands, which are order-less. I think this is a pretty cool thing, to throw away the order, yet still, ensure that the convergence is predictable and dependable on operations we have.

Completely throwing away the operation ordering and working with operation sets instead of sequences is tricky though. Consider some variable x, initially at x:=2. If we have two operations: (1) x:=x+2 and (2) x:=x*2, we can clearly see the difference if these operations are applied in a different order — by doing the operation (2) first, we will get a final state of 6 instead of 8. This presents a convergence problem and a violation of SEC if different nodes apply these operations in a different order. In a sense, these two operations, if issued concurrently, conflict with each other and require ordering. So clearly we need to be smart to avoid such conflicts and make SEC work. 

There is no generic solution to avoid such conflicts, but we can design specific data structures, known as Conflict-free Replicated Data Types or CRDTs solve this ordering problem for some use cases. As the name suggests, CRDTs are built to avoid the conflicts between different updates or different versions of the same data object. In a sense, CRDTs provide a data structure for a specific use case with some defined and restricted set of operations. For instance, we can have a CRDT to implement a distributed counter that can only increment the counter’s value or a CRDT for an add-only set. The paper presents two broad types of CRDTs — state-based and operation-based CRDTs. Both types are meant for replicated systems and differ in terms of communicating the updates between nodes and reconstructing the final state. 

State-based CRDTs transfer the entire state of the object between nodes, so they can be a bit heavier on bandwidth usage. The actual state of a data structure is not directly visible/accessible to the user, as this state may be different than the logical meaning of the data structure. For example, going back to the counter CRDT, logically we have a single counter, but we may need to represent its value as consisting of multiple components in order to ensure conflict-free operation. Assume we have n nodes, and so to design a state-based counter CRDT we break down the counter value to registers <c1, c2, c3,…, cn>, each representing the increments recorded at a particular node. The logical state of CRDT counter is the sum of all registers \(c=\sum_{i=1}^nc_i\), which must be exposed through a query function. In addition to the query function, there must be an update function to properly change the underlying state. For the counter, the update function will increment the register corresponding to its node id. 

The most important part, however, is still missing. If some node receives concurrent increments, how can it reconcile them? Let’s say we have 3 nodes {n1, n2, n3}, each starting in some initial counter state <4,5,2>. These nodes receive some updates and increment their respective registers locally: n1:<6,5,2>, n2:<4,6,2>, n3:<4,5,4>. The nodes then send out their now divergent copies of the counter CRDT to each other. Let’s say node n2 received an update from n3, and now it needs to merge two versions of CRDT together. It does so with the help of the merge function, which merges the two copies and essentially enforces the convergence rules of SEC. There are some specific requirements regarding the merge function, but they essentially boil down to making sure that the order in which any two CRDTs merge does not matter at all. In the case of our counter, the merge function can be as simple as a pairwise comparison of registers between two versions of CRDT and picking the maximum value for each register. So for merge(n2:<4,6,2>, n3:<4,5,4>), we will see the updated value of <4,6,4> on node n2. If at this point n2 sends its update to n1, then n1 will have to do merge(n1:<6,5,2>,n2:<4,6,4>), and get the final version of <6,6,4>. Note that if n1 now also receives n3‘s update, it will not change the state of n1, since that update was already learned indirectly. This scheme works pretty neatly. It tolerates duplicate messages and receiving stale updates. However, we can also see some problems — we carry a lot more state than just a simple integer to represent the counter, and our counter’s merge function has restricted the counter to only allow increments. If we try to decrement a value at some node, it will be ignored, since the merge function selects the max value of a register. The latter problem can be fixed, but this will require essentially doubling the number of registers we keep for each node, exacerbating the state-size problem. 

Example of state-based counter with some sample message exchange.

Operation-based CRDTs somewhat solve the above problems. Instead of transferring the full state of the object, op-based CRDTs move around the operations required to transform from one state to the next. This can be very economical, as operations may use significantly less bandwidth or space than the full CRDT state. For instance, in our counter CRDT example, the operation may be the addition of a number to the counter. Of course, as the operation may propagate at different speeds, and potentially get reordered, op-based CRDT requires that all concurrent operations are commutative. In other words, we again set the rules to ensure that the order of updates (i.e. operations) does not matter. In the counter use-case, we know that all additions commute, making it easy to implement an op-based counter. Unlike the state-based version, we do not even need to have multiple registers and sum them up to get the actual value of the counter. However, there are some important caveats with op-based CRDTs. They are susceptible to problems when a message or operation gets duplicated or resent multiple times. This creates a significant challenge, as either the operations themselves must be designed to be idempotent, or the operation delivery layer (communication component of the application) must be able to detect duplicates and remove them, essentially ensuring idempotence as well.

The paper goes into more details and more examples of each type of CRDT, as well as explaining how the two types are roughly equivalent in terms of their expressivity. Intuitively, one can think of the merge function as calculating a diff between two CRDT versions and applying it to one of the versions. Operations are like these diffs to start with, so it makes sense how the two types can be brought together. We have our presentation of the paper available here:

Discussion

1) Challenges designing CRDTs. As mentioned in the summary, CRDTs are special-purpose data structures, so designing them to fit a use case takes some time. I have spent some time a few years ago working on CRDTs at Cosmos DB, and it was a very fun thing to do, but also a bit challenging. A good example of the problem is a set CRDT. It is easy to make an add-only set, where items can be added and not removed. All set additions commute, so the problem is trivial. But to make sets more practically, we want to remove items too. A simple solution is to internally implement a removed set, so CRDT tracks all items added and removed separately. This way we can hardcode the precedence of adds and removes and say removes always come after ads for an item. But this works only as long as we do not ever need to re-add items back into the set…

2) Modeling. Due to their concurrency nature, it is a good idea to model and model-check CRDTs. I used TLA+ for this purpose. During the discussion, a question was raised on the best tools for CRDT model-checking, but unfortunately, nobody knew anything better than TLA+/TLC. I’d suspect that other tools used for verifying distributed systems, such as Alloy, could work as well.

3) Applications. Quite a bit of discussion was focused on applications that use CRDTs. we talked quite a bit about near-real-time collaborative tools, such as collaborative document editing. I mentioned the Google Docs style of application quite a few times, but it was brought up that Google actually uses Operational Transformation (OT) instead of CRDT. In particular, server-based OT, which requires a server to sync each client against. Regardless, collaborative tools seem to be the prime field for CRDTs. For instance, the Automerge library provides a good start for JSON-like CRDT to serve as the basis for these types of applications

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. New Directions in Cloud Programming

Recently we have discussed a CIDR’21 paper: “New Directions in Cloud Programming.Murat Demirbas did the presentation:

Quite honestly, I don’t like to write summaries for this kind of paper. Here, the authors propose a vision for the future of cloud applications, and I feel that summarizing a vision often results in the misinterpretation of that vision. So I recommend reading the paper to draw your own unbiased conclusions. 

That being said, here is my extremely high-level take on the paper in a few points:

  • Application developers should focus on application logic, and not worry about implementation aspects of consistency, durability, availability, etc. 
  • It does not mean that developers do not care about consistency, availability, redundancy at all. Instead, they simply should know what they need, and let the cloud provide these. As such, developers should declare their consistency, availability, budgetary needs, etc., and have the cloud runtime enforce such declarations. This will free up the programmers and let them focus on the application logic instead and make this logic “unsoiled” by the other aspects of the distributed app.
  • To help developers focus on their applications/tasks, we need domain-specific languages (DSLs). DSLs can hide a lot of “mechanical” work from the programmers and delegate it to the cloud runtime. A good example of a popular DSL we have been using for a very long time is SQL. It is declarative — programmers retrieve and update the data without worrying about how it is done under the hood. 
  • Despite potentially having many DSLs, we still want one comprehensive framework to run it on, so the visionary system here can compile DSL to some common Intermediary Representation (IR). The authors want the IR to be human-readable and optimizable, although I feel like this requirement is part of the “evolutionary” theme in the paper, and eventually, the importance of human optimizations may diminish. 
  • Achieving this highly declarative vision is hard, but the paper lists several developing and emerging techniques and research directions that may help evolve the cloud. 

Discussion. 

1) DSL. We have spent quite some time discussing DSLs and what does it mean to have many of them. For one, we already have a few successful ones, like SQL. Arguably, ML/AI and data processing systems also often have some form of DSLs. TensorFlow was brought as an example. One minor concern that was expressed in the group is that having many of these DSLs requires someone to make and maintain them. A more interesting DSL question is how specialized they should become? To bring SQL example again, while it is great for what it does, it is rarely used all by itself. So there will be a clear need to allow to mix and match these highly specialized DSLs, potentially making the problem of translating them to IR more difficult. 

2) IR. A big part of the Hydro system vision is the IR language. The language itself may get rather complicated when it needs to support many IRs. A bigger challenge may be having to translate DSL logic to a human-readable IR code. The translations that are done must make sense to engineers, the logic should be clear and not obscure to allow people to make sense of it. This pursuit of human readability may result in less performance efficient IR. The readability may also depend on the individual DSLs. 

Another point we discussed is whether programmers will just start writing code directly in IR if it is a good, readable, feature-rich language. Maybe this is exactly what the programmers need after all? A language made specifically for distributed applications.

3) How much of this is already in the cloud? DSLs exist, the serverless cloud is developing too, providing more consistency and durability than before. For example, Azure Durable Functions save their intermediate state and can be resumed in the face of failures. And surprisingly, many of these cloud offerings, like serverless, durable functions, serverless storage are easy to use. Last semester I gave a project in my Cloud Computing Systems that used blob storage, serverless functions, and durable functions. To my surprise, the students loved the project and were able to figure out all of this tech (which they had to do on their own since the tech aspect was not really part of the problem they were solving) in just a few days. So as it stands right now, the cloud is evolving quickly, with serverless computing and storage becoming more ad more capable. It is not a coherent single cloud runtime just yet, and probably won’t be there any time soon, but some aspects of the vision are there. Users can scale serverless compute, not worry about its availability, may opt into more durable options when needed, may use cloud-native storage with configured/declared consistency, take advantage of DSLs for many tasks in the cloud, like data management, ML/AI systems, etc…

4) Drivers of innovation? An interesting discussion happened at the end of our meeting. Some expressed the opinion that cloud vendors should know better in what direction to develop the cloud since they are in constant interaction with the clients and must adjust to what clients are asking. I, personally, disagreed with this opinion — cloud clients are not thinking about the long-term visions like this paper describes. Instead, they may have more immediate concerns that must be dealt with given all the technology they already use in the cloud. An example I used is the true invention of GUI by Xerox PARC. The vision was out there, but nobody was really asking for this back then, even Xerox did not really know what to do with it, and willingly let others copy the ideas. Yet, this innovation made modern consumer electronics/computing what it is today. I suspect, that if Xerox were asking clients about what to improve, they may have worked on something as boring as developing a console with 120-character lines instead of an 80-characters one to make existing systems more “user friendly.”

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. FlightTracker: Consistency across Read-Optimized Online Stores at Facebook

Last DistSys Reading Group we have discussed “FlightTracker: Consistency across Read-Optimized Online Stores at Facebook.” This paper is about consistency in Facebook’s TAO caching stack. TAO is a large social graph storage system composed of many caches, indexes, and persistent storage backends. The sheer size of Facebook and TAO makes it difficult to enforce meaningful consistency guarantees, and TAO essentially operates as an eventual system, despite having some stronger-consistency components in it. However, a purely eventual system is very unpredictable and hard to program for, so TAO initially settled for providing a read-your-write (RYW) consistency property. The current way of enforcing RYW is FlightTracker. FlightTracker is a recency token (Facebook calls these tokens Tickets) system running in every Facebook datacenter. Tickets keep track of recent writes performed by a datacenter-sticky user session. In a sense, the ticket is a set of tuples <Key, Key-Progress>, where the Key-Progress is some value to designate the recency of the write on the Key, like a version, timestamp, or a partition sequence number. The reads then include the ticket and propagate it across the stack to the nodes that serve the requests. With the recency information in the ticket, a server can make a local decision on whether it is sufficiently up-to-date. If the node is stale, it can forward the request to a higher-level cache or durable store to retrieve the data.

Many other systems use recency tokens, but they usually do not explicitly specify all writes done by the user. For example, a token may be a single number representing the last transaction id seen by the client. This is good for making sure the recency tokens are small, but it has a smaller resolution — such token will enforce a per partition recency instead of per key, and cause too many caches misses when the per-key RYW guarantees are needed. 

However, keeping and transferring the explicit set of all client’s write is expensive, so FlightTracker uses a few compaction strategies. For one, it is only sufficient to keep track of the most recent write on the key. Secondly, in some workloads with a larger number of writes, FlightTracker may reduce the resolution and stop tracking individual writes and, for example, switch to a partition-level tracking by transaction id or sequence number. Finally, TAO stack enforces some bounded consistency of about 60 seconds, so the writes older than 60 seconds can be purged from the ticket. 

FlightTracker stores the tickets in simple replicated systems based on a distributed hashing. Upon reads, the ticket is first fetched from the FlightTracker, and then included with all the read operations. Since one request typically makes many reads, the cost of ticket fetching is amortized over many read operations. Nevertheless, the FlightTracker is fast to fetch tickets — it takes just ~0.3 ms.  Whenever writes happen, a ticket for a particular user session is updated to include the new writes and exclude the compacted ones.

The paper has many details that I have left out of this summary and the presentation:

Discussion

1) What can go wrong if RYW is broken? The paper discusses the RYW topic quite substantially. One important point here is that RYW enforcement is “relatively” cheap — it provides some useful consistency guarantee without making cache misses due to consistency (i.e. consistency misses) too frequent. So it appears like a balance between the usefulness and the cost of consistency properties at the Facebook scale. However, the paper does not talk much about what can go wrong in Facebook (or more specifically in applications that rely on the social graph) if RYW does not hold. The paper mentions that it is a reasonable default consistency for developers and users. In our discussions, we think it is more useful for the end-users. If a person posted something on the site, then refreshed the page and the post does not appear because of RYW violation, the user may get confused whether the site is broken, or whether they pressed the right button. We do not think that in most cases there will be serious consequences, but since Facebook is a user-centric application, providing intuitive behavior is very important. Actually, the paper suggests that RYW violations may still happen, by saying that the vast majority of servers (>99.99%) are within the 60 seconds staleness window. This means that in some rare cases it is possible to have a “clean” ticket after compaction and hit one of these <0.01% of servers and get stale data. 

2) Architectural style. So… this is a Facebook paper, and you can feel it. Just like many other Facebook papers, it describes the system that appears very ad-hoc to the rest of the stack. The original TAO is also a combination of ad-hoc components bolted together (does it still use MySQL in 2021?). The FlightTracker was added to TAO later as an after-thought and improvement. Not a bad improvement by any means. And having all the components appear separate and independent serves its purpose – Facebook can build a very modular software stack. So anyway, this appears like a very “engineering” solution, bolted onto another set of bolt-on components. And it servers the purpose. Having 0.3 ms (1-2 eye blinks) additional latency to retrieve a ticket and provide something useful to developers and users is not bad at all. 

Another interesting point from this discussion is that Facebook is actually very conservative in its systems. Still, using PHP/Hack? MySQL? They create systems to last and then bolt on more and more components to improve. I guess at some critical mass all the bolted-on parts may start to fall off, but that only means it is time to rethink that system with something groundbreaking and new. So what about “Move fast and break things?” Does it contradict some of that conservativism? Or does it augment it? I think that latter — if something needs to change/improve, then just add another part somewhere to make this happen. 

3) Stronger consistency than RYW? The paper says that FlightTracker can be used to improve the consistency beyond RYW. The authors provide a few examples of systems manipulating the tokens to get more benefits — indexes and pub-sub systems. With indexes, they actually bolt-on yet another component to make them work. 

4) Cost of tickets. Since each ticket represents a set of recent writes, its cost is not static and depends on the number of writes done by the user session in the past 60 seconds. The main reason the cost of storing and transferring tickets does not explode is the 60-second global compaction, allowing to keep an average ticket size at 250 bytes. The median ticket size is 0 bytes, meaning that a lot of requests happen 60 seconds after users the last write. However, we do not think that a system like this will scale to a more write-heavy workload. TAO’s workload (at least in 2013 when the original paper came out) is 99.8% reads, so write are rare. With more writes, a constant-size ticket may start to make more sense, and we have a feeling that the cross-scope compaction when writes on a ticket are replaced with a more comprehensive/encompassing progress marker. 

5) Cross-datacenter issues. One of the reasons for implementing FlightTracker was the fault tolerance, as the prior RYW approach that relied on write-through caches could not handle some failures that require changes in the write’s route through the caches to the storage. With FlightTracker, any TAO datacenter/cluster can serve reads while enforcing the RYW. This enables, in some rare cases, to even do reads from another datacenter if the local cluster is not available. However, it appears that the users are still sticky to the datacenter, as FlightTrakcer service lives independently on each datacenter. This means that a user’s request must come to the datacenter, retrieve the ticket, and only then it can cross the datacenter boundaries if there is a big outage locally. If the outage is so severe that the user cannot even reach its datacenter, then its request won’t get the ticket and may actually experience an RYW violation. Another nice Facebook paper talks in more detail about what happens to user requests before they reach datacenters. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories

Hard to imagine, but the reading group just completed the 45th session. We discussed “Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories,” again from OSDI’20. Pegasus is one of these systems that are very obvious in the hindsight. However, this “obviousness” is deceptive — Dan Ports, one of the authors behind the paper who joined the discussion, mentioned that the project started in 2017, so it was quite a bit of time from the start to publish with a lot of things considered and tried before zeroing in on what is in the paper. 

Pegasus is a replication system for load balancing and throughput scalability in heavily skewed workloads. Consider a workload with a handful of “hot” objects. These hot objects may have so much usage, that they overwhelm their respective servers. Naturally, this limits the overall throughput because the system is now capped by servers at their maximum capacity/utilization. The solution is to replicate these hot objects to many servers and allow clients to access them from multiple replicas. However, as soon as we have a replicated scenario, we start running into consistency issues. Strongly consistent systems often degrade in performance with the addition of more replicas due to the synchronization overheads. This is what makes Pegasus rather unique — it scales for load balancing through replication while remaining strongly consistent. The key enabler of this is the smart Top of Rack (ToR) switch that handles all the traffic in the server rack. This switch acts as the “source of synchrony” in the rack, and it does so at the packet’s line speed. 

In Pegasus, the data is assigned to servers in the rack using a consistent hashing mechanism, allowing clients to send the requests directly to servers that own the data. However, all these requests go through the ToR switch which can inspect the packets and make some additional routing decisions for “hot” objects. Let’s consider a write request for a such high-demand object. ToR inspects a packet, and if it is for a “hot” key, it sends the write message to some larger and potentially different set of servers in a rack, essentially increasing the replication factor and rotating the responsible servers. Once the servers ack the write completion, the ToR switch sees the acks and records these servers into its coherency directory as servers with the latest copy of the data. The read requests have a similar rerouting fate — if a read is for a hot object, instead of going to the default server, the ToR switch sends it to one of the replicas from its coherency directory. The paper has more details on implementing this coherency directory and keeping track of the recent progress using a simple versioning mechanism.

The end result is awesome! Just by replicating a handful of objects in skewed workloads (~16 objects out of a million in the paper), Pegasus achieves load balancing and high throughput beating in-network caching in almost all scenarios. There are a few other advantages to Pegasus that are missing in other SOTA solutions: the ability to store larger objects (not evaluated), and tolerance of workloads with different read-write ratios (evaluated extensively).

Finally, I have not touched on a few other important pieces of the system: figuring out which keys are hot and fault-tolerance. For measuring the key temperature, the Pegasus statistics engine samples some packets and determines the frequency of keys in the samples to make gauge how hot each key is. For fault-tolerance, the system uses chain replication across racks for durability.

As always, we have our presentation of the paper by A. Jesse Jiryu Davis:

Discussion

This time around we had Dan Ports join us to answer the questions about the paper, so this turned out to be a nice discussion despite a slightly lower than expected attendance. 

1) Simple API. Currently, Pegasus supports a simple API with reads and simple destructive writes (i.e. each write is an unconditional overwrite of the previous copy of the data). The main reason for this is how the writes are structured in Pegasus. The system is very nimble and quickly adjustable, it picks write servers on the fly as the write request “goes through” the switch. This means that a write should be able to just complete on the new server. However, if the write operation is, for example, a conditional write or an update, then such an update also needs to have the previous version of the object, which may be missing on the new server. We have spent some time discussing workarounds for this, and they surely seem possible. But the solution also introduces additional communication, which both introduces more load to the servers and more latency for operations. And since we are dealing with a subset of objects that already generate the most load in the system, adding anything more to it must be avoided as much as possible. The cost of supporting these more complex API will also differ for various read-write ratios.

2) Comparison with caching. Another big discussion was around using caching to load balance the system. As Dan pointed out, caches are good when they are faster than storage, but for super-fast in-memory storage, it is hard to make a cache faster. NetCache (one of the systems used for comparison in the paper) does provide a faster cache by placing it in the network switch. It has several downsides: handles only small objects, consumes significant switch resources, and does not work well for write workloads (this is a read-through cache, I think). Of course, it is possible to make it a write-through cache as well to optimize for write workloads, but it still does not solve other deficiencies and adds even more complexity to the system. We also touched on the more complicated fault-tolerance of cached systems. The disparity between the load that cache and underlying systems can take can create situations when the underlying systems get overrun upon cache failure or excessive cache misses. 

3) Chain replication. Since Pegasus replicates for scalability, it needs a separate mechanism to handle fault-tolerance. The paper suggests using a chain replication approach, where racks are the chain nodes. Only the tail rack serves reads, however, the writes must be applied in all racks as the write operation propagates through the chain. One question we had is why not use something like CRAQ to allow other racks to serve reads, but the reality is that this is simply not needed. The chances that an object can become so skewed and hot that it needs more than a rack worth of servers are very slim, so there is no need to complicate the protocol. Another question I have now but forgot to ask during the discussion is what happens to hot writes as they go through the chain? If non-tail racks only use the default server for writes on “hot” keys (instead of let’s say round-robin or random node), then this server may get overwhelmed. But I think it is trivial to pick a random server for the hot object on each write at the non-tail racks. 

4) Zipfian distribution and workload skewness. Pegasus needs to load-balance fewer keys for a less skewed Zipfian distribution. This was an interesting and a bit counter-intuitive observation at the first glance since one can intuitively expect the more skewed distribution to require more load-balancing. However, a higher alpha Zipf has more skewed objects, but not necessarily more skewed objects than a lower alpha Zipfian distribution. Fewer highly skewed objects mean less load-balancing. 

5) Virtualization of top-of-rack switches. One important question about the technology that enables Pegasus is the virtualization of these smart ToR switches. Currently, it is not the case, so one needs to have bare-metal access to a switch to deploy the code. Virtualization of such a switch may make the technology available to cloud users. I think this would be a huge boost to the overall state of distributed computing at the datacenter level. I would be speculating here, but I think a lot depends on the willingness of manufacturers to provide such support, especially given the relatively limited capabilities of the hardware right now. Of course, the virtualization should not add a significant latency penalty to the users, and most importantly should not add any penalty to non-users (applications/systems that reside in the same rack but do not use the extended capabilities of the switches). Couple all these with the risks of running user’s code on the hardware that handles all the traffic in the rack, and we also need to worry about user isolation/security more than ever. However, as wishful as it is, it is quite probable that these smart switches will not make their way to the public cloud any time soon. This gives large cloud vendors an edge since they can benefit from the technology in their internal systems. Smaller service providers that rely on the cloud, however, will have to find a way to compete without access to this state-of-the-art technology.  Aside from my extremely high-level speculations, some smart people actually go deeper into the topic.

6) There were a few other minor topics discussed, and jokes are thrown here and there. For example, Dan explains Pegasus with cat pictures

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Cobra: Making Transactional Key-Value Stores Verifiably Serializable.

This Wednesday, we were talking about serializability checking of production databases. In particular, we looked at the recent OSDI’20 paper: “Cobra: Making Transactional Key-Value Stores Verifiably Serializable.” The paper explores the problem of verifying serializability in a black-box production system from a client point of view. This makes sense as serializability is an operational, client-observable property. The tool, called Cobra, collects the history via a middle layer sitting between a client and the database and then uses the history to construct a polygraph representing all possible execution orders. The problem then becomes finding whether a serial execution order exists in the polygraph. This means that we need to find some graph that has no dependency/ordering cycles. For instance, this would represent a cycle: A depends on B, B depends on C, and C depends on A. In fact, quite a few tools take a similar approach for checking sequential equivalence properties. What makes Cobra different is that they want to check serializability at scale, but the problem is NP-complete, so adding more events to the graphs makes checking exponentially slower. To mitigate the issue, Cobra uses a few tricks. It uses a few domain-specific heuristics to reduce the size of the polygraph. It also takes advantage of parallel hardware (GPUs) to speed up (by order of magnitude!) some highly-parallel polygraph-pruning tasks. And at last, Cobra uses an SMT solver to perform the final satisfiability search on the pruned polygraph. I will leave the details of all these methods to the paper.

Our video presentation by Akash Mishra is, as always, on YouTube:

Discussion

This is a very nice and interesting paper that sparked some lively discussion. here I list a few of the key discussion points.

Cobra Performance

1) Performance improvement. Cobra is significantly faster than the baselines in the paper. One concern during the discussion was about the optimizations of the baselines. For example, one baseline is Cobra’s approach minus all the optimizations. This is great in showing how much improvement the core contribution of the paper brings, but not so great at comparing against other state-of-the-art solutions. For the paper’s defense, they do provide other baselines as well, all of which perform worse than Cobra minus the optimizations one. So maybe there are no other domain-specific solutions like Cobra to compare just yet.

2) Performance improvement part 2. Another performance discussion was around the use of GPU. While the authors mention a magnitude improvement when using GPU for graph pruning, it is not often clear how much of the overall improvement is due to the GPUs. Interestingly enough, the overall gains compared to the baseline are ten-fold. For this one, the paper provides a figure that breaks down the time spent in each phase/optimization of Cobra. In read-heavy workloads, polygraph pruning, which is GPU-optimized, dominates the entire computation, suggesting that a lot of the gains may come from the use of specialized hardware.

3) Better parallel hardware? Are GPUs the best hardware to accelerate pruning? Maybe some better alternative exists? FPGAs?

4) Is it fast enough? While Cobra is significantly faster than other approaches, it may still be not fast enough for use in some production workloads. While it can handle 10k transactions in ~15 seconds, real production workloads can produce more transactions in under one second. The paper claims that Cobra can sustain an average load of 2k requests per second, which is enough for many large services. 2k transactions per second is the scale of systems about 35 years ago.

5) Do we need this in production? We spent quite a bit of time discussing this. Aside from concerns in point (3) above, there may be less utility from checking a production system. Achieving serializability in happy-case is not as difficult. There are plenty of databases out there that do just that. Testing systems in production is like testing a happy-case execution most of the time, so there may be little incentive to do that, especially given the cost of Running Cobra.

Keeping the same guarantees under failures is more difficult, this is why tools like Jepsen stress test the system by introducing the faults. Our thinking was that Cobra when combining with the fault injection can be a powerful stress-test tool. And with the capacity to check larger histories, it may be useful for checking more involved scenarios and doing routine fuzz testing (maybe even check-in testing!) to try to prevent engineers from introducing bugs.

As far as continuous production use, we did not have many scenarios, aside from a service provider checking its own compliance with some consistency SLAs or a user trying to catch a service provider to get a discount (but at what cost?)

6) Limitations. Cobra works only on key-value stores and only with a subset of common operations. For instance, it does not support range operations. Checking the serializability of range commands requires knowing not only what keys exist but also what keys do not exist in the system. This is hard in a black-box production system. However, our thought was, if this is not a production system, and you start with a blank-state for testing purposes, the knowledge of what keys do not exist is there — initially, none of the keys exist. Maybe in such a scenario, Cobra can add support for ranged operations. Especially since we think it will be more useful as a very powerful testing tool (point 5 above) rather than a production monitoring tool. There may be other reasons for the lack of range operation support that we do not know or understand.

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

One Page Summary. Gryff: Unifying Consensus and Shared Registers

This paper by Matthew Burke, Audrey Cheng, and Wyatt Lloyd appeared in NSDI 2020 and explores an interesting idea of a hybrid replication protocol. The premise is very simple – we can take one protocol that solves a part of the problem well, and marry it with another protocol that excels at the second half of the problem. This paper tackles replication in geo-distributed strongly consistent storage systems. The authors argue that consensus, when used in storage systems with predominantly read and write operations, is inefficient and causes high tail latency.

A system presented in the paper, called Gryff, takes advantage of predominantly read/write workloads in storage systems and exposes these two APIs via a multi-writer atomic storage ABD protocol.  ABD operates in two phases both for reads and writes. On writes, ABD’s coordinator retrieves the latest version of the register from all nodes and writes back with a version higher than it has seen. On reads, ABD’s coordinator again retrieves the register, writes the highest version back to the cluster to ensure future reads do not see any previous versions, and only then returns back to the client. The write-back stage, however, can be skipped if a quorum of nodes agrees on the same version/value of the register, allowing for single RTT reads in a happy case.

Unfortunately, ABD, while providing linearizability, is not capable of supporting more sophisticated APIs. Read-modify-write (RMW) is a common pattern in many storage systems to implement transaction-like conditional updates of data. To support RMW, Gryff resorts back to consensus and in particular to Egalitarian Paxos (EPaxos) protocol. Choice of EPaxos allows any node in the cluster to act as the coordinator, so it does not restrict writes to a single node like with many other protocols. The problem of this hybrid approach is then the ordering of operations completed with ABD protocol and RMW operations running under EPaxos. Since EPaxos side of Gryff works only with RMWs, it can only order these operations with respect to other RMW operations, but what we need is a linearizable ordering of RMWs with normal writes and/or reads. To keep the ordering, Gryff uses tuples of ABD’s logical timestamp, process ID and the RMW logical counter, called carstamps. Carstamps connect the ABD part of the system with EPaxos – only ABD can update ABD’s logical clock, and only EPaxos updates RMWs counter.

When we consider the interleaving of writes and RMWs, the write with higher ABD’s logical time supersedes any other write or RMW. This means that we actually do not need to order all RMWs with respect to each other, but only order RMWs that have the same base or ABD’s logical time. EPaxos was modified to allow such partial ordering of commands belonging to different bases, essentially making the protocols to have different dependency graphs for RMWs applied to different ABD states. Another change to EPaxos is the cluster-execute requirement, as the quorum of nodes need to apply the change before it can be returned to the client to make the change visible for subsequent ABD read operations.

gryff_1
So, how does Gryff do with regards to performance? Based on the author’s evaluation, it is doing very well in reducing the (tail) latency of reads. However, I have to point out that the comparison with Multi-Paxos was flawed, at least to some extent. The authors always consider running a full Paxos phase for reads, and do not consider the possibility of reading from a lease-protected leader, eliminating 1 RTT from Paxos read. This will make Paxos minimum latency to be smaller than Gryff’s, while also dramatically reducing the tail latency. Gryff also struggles with write performance, because writes always take 2 RTTs in the ABD algorithm. As far as scalability, authors admit that it cannot push as many requests per second as EPaxos even in its most favorable configuration with just 3 nodes.gryff_2

Can Paxos do better? We believe that our PQR optimization when applied in WAN will cut down most of the reads down to 1 quorum RTT, similar to Gryff. PQR, however, may still occasionally retry the reads if the size of a keyspace is small, however this problem also applies to Gryff when the cluster is larger than 3 nodes.

What about Casandra? Cassandra uses a protocol similar to ABD for its replication, and it also incorporates Paxos to perform compare-and-set transactions, which are one case of RMW operation, so in a sense Gryff appears to be very similar to what Cassandra has been doing for years.

 

Paper Summary: Bolt-On Global Consistency for the Cloud

lat1
Figure 1. Latency improvement from going multi-cloud. Minimum read latency SLO feasible when using a single provider for data storage versus when using multiple providers.

This paper appeared in SOCC 2018, but caught my Paxos attention only recently. The premise of the paper is to provide strong consistency in a heterogeneous storage system spanning multiple cloud providers and storage platforms. Going across cloud providers is challenging, since storage services at different clouds cannot directly talk to each other and replicate the data with strong consistency. The benefits of spanning multiple clouds, however, may worth the hustle, since a heterogeneous system will be both better protected from cloud provider outages, and provide better performance by placing the data closer to the users. The latter aspect is emphasized in the paper, and as seen in the figure, going multi-cloud can reduce latency by up to ~25%.

Figure 2. Comparison of (a) classic Paxos and (b) CPaxos.
Figure 2. Comparison of (a) classic Paxos and (b) CPaxos.

To solve the issue of consistent cross-cloud replication, authors propose to use Cloud Paxos (CPaxos), a Paxos variant designed to work with followers supporting a very minimal and common set of operations: get and conditional put. In CPaxos, clients act as prospers, and storage systems serve the role of the followers. The followers are not really “smart” in this protocol, and most of the Paxos logic shifts to the client-proposers (Figure 2).

The prepare phase in CPaxos simply gathers the state from the followers, making the proposer decide for itself whether the followers would have accepted it with the current ballot or not. If the proposer thinks it would have been accepted, it will try updating the followers’ state. Doing this, however, requires some precautions from the followers, since their state may have changed after the proposer made a decision to proceed. For that matter, CPaxos uses conditional put (or compare-and-set) operation, making the followers update their state only if it has not changed since it was read by the proposer. This ensures that at most one proposer can succeed with changing the state of the majority of followers.

Figure X. Object log. Each slot corresponds to some version, and can be committed at some ballot #.
Figure 3. Object log. Each slot corresponds to some version, and can be committed at some ballot #.

I visualize this as a log to represent changes in some object’s state. The new version of an object corresponds to a new slot in the log, while each slot can be tried with different ballot by different proposers. The put operation succeeds at the follower only if the value at the slot and a ballot has not been written by some other proposer. In case a proposer does not get a majority of successful updates, it needs to start from the beginning: increase its ballot, perform a read and make a decision whether to proceed with state update. Upon reaching the majority acks on state update, the proposer sends a message to flip the commit bit to make sure each follower knows the global state of the operation.

This basic protocol has quite a few problems with performance. Latency is large, since at least 2 round-trips are required to reach consensus, since every proposer needs to run 2 phases (+ send a commit message). Additionally, increasing the number of proposers acting on the same objects will lead to the growth in conflict, requiring repeated restarts and further increasing latency. CPaxos mitigates these problems to a degree. For example, it tries to commit values on the fast path by avoiding the prepare phase entirely and starting an accept phase on what it believes will be next version of an object with ballot #0. If the proposer’s knowledge of the object’s state (version, ballot) is outdated, the conditional put will fail and the proposer will try again, but this time with full two phases to learn the correct state first. However, if the proposer is lucky, an update can go in just one round-trip. This optimization, of course, works only when an object is rarely updated concurrently by multiple proposers; otherwise dueling leaders become a problem not only for progress, but for safety as well, since two proposers may write different values for the same version using the same ballot. This creates a bit of a conundrum on when the value becomes safely anchored and won’t ever get lost.

Figure 3. Dueling leaders both start opportunistic accept with ballot #0, each writing their own values (green and blue)
Figure 4. Dueling leaders both start opportunistic accept with ballot #0, each writing their own values (green and blue)

Figure 4. Which value to recover in case of the failure? Both green and blue are on the same ballot and have the same number of live nodes
Figure 5. Which value to recover in case of the failure? Both green and blue are on the same ballot and have the same number of live nodes

Consider an example in which two proposers write different values: green and blue to the same version using ballot #0 (Figure 4 on the left). One of the proposers is able to write to the majority, before it becomes unresponsive. At the same time, one green follower crashes as well, leading to a situation with two followers having green value and two being blue (Figure 5 on the right). The remaining proposer has no knowledge of whether the green or blue value needs to be recovered (remember, they are both on the same ballot in the same slot/version!). To avoid this situation, CPaxos expands the fast path commit quorum from majority to a supermajority, namely \(\left \lceil{\frac{3f}{2}}\right \rceil  +1\) followers, where \(2f+1\) is the total number of followers, and f is the tolerated number of follower failures, allowing the anchored/committed value to be in a majority of any majority of followers exploding-head. Having this creates an interesting misbalance in fault tolerance: while CPaxos still tolerates \(f[\latex] node failures and can make progress by degrading to full 2 phases of the protocol, it can lose an uncommitted value even if it was accepted by the majority when up to [latex]f\) followers fail.

Figure 5. (a) messages are sent at the same time; delivery time is different increasing the duration of inconsistency/conflicting state. (b) some messages are sent with delay to provide similar arrival time and reduce the duration of inconsistent state
Figure 6. (a) messages are sent at the same time; delivery time is different increasing the duration of inconsistency/conflicting state. (b) some messages are sent with delay to provide similar arrival time and reduce the duration of inconsistent state

Proposer conflicts are a big problem for CPaxos, so naturally the protocol tries to mitigate it. The approach taken here reduces the duration in which possible conflicts may occur. As CPaxos is deployed over many datacenters, the latencies between datacenters are not likely to be uniform. This means, that a prepare or accept messages from some proposer reach different datacenters at different times, creating an inconsistent state. When two proposers operate concurrently, they are more likely so observe this inconsistency: as both proposers quickly update their neighboring datacenters, they run the risk of not reaching the required supermajority due to the conflicting state (Figure 6(a)) created by some messages being not as quick to reach remaining datacenters. To avoid rejecting both proposers, CPaxos schedules sending messages in a way to deliver them to all datacenters at roughly the same time. This reduces the duration of inconsistent state, allowing to order some concurrent operations (Figure 6(b)).

Figure 6. Degradation in write throughput under different conflict rates.
Figure 7. Degradation in write throughput under different conflict rates.

Despite the above mitigation strategy, conflicts still affect CPaxos greatly. The authors are rather open about this, and show their system CRIC with CPaxos degrading quicker than Paxos and Fast Paxos as the conflict rate increases. However, in the low conflict scenario, which authors argue is more likely in real world applications, CRIC and CPaxos improve on performance compared to Paxos/Fast Paxos, especially for reading the data. This is because reads in CPaxos are carried out in one round-trip-time (RTT) by client-proposer contacting all followers and waiting for at least a majority of them to reply. If the client sees the latest version with a commit flag set in the majority, it can return the data. Otherwise, it will wait to hear from more followers and use their logs to determine the safe value to return. In some rare cases when the proposer cannot determine the latest safe value, it will perform the recovery by running the write path of CPaxos with the value to recover (highest ballot value or highest frequency value if more than one value share the ballot).

Figure 7. Read and write latency.
Figure 8. Read and write latency.

Some Thoughts

  • The motivation of the paper was to make strongly consistent system spanning multiple clouds providers and storage systems for the benefit of improved latency though leveraging the location of datacenters of these different providers. However, CRIC and CPaxos protocol requires a lot of communication, even on the read path. During reads, a client-proposer contacts all CPaxos nodes, located at all datacenters, and in best case still needs the majority replies. As such the latency benefit here comes from trying to get not just one node closer to the client, but a majority of nodes. This may be difficult to achieve in large systems spanning many datacenters. I think sharding the system and placing it on subset of nodes based on access locality can benefit here greatly. For instance, Facebook’s Akkio paper claims to have significant reduction in traffic and storage by having fewer replicas and making data follow access patterns. In our recent paper, we have also illustrated a few very simple data migration policies and possible latency improvement from implementing these policies.
  • One RTT reads in “happy path” can be implemented on top of regular MutliPaxos without contacting all nodes in the systems. Reading from the majority of followers is good enough for this most of the time, while in rare circumstances the reader may need to retry the read from any one node. More on this will be in our upcoming HotStorage ’19 paper.
  • The optimization to delay message sending in order to deliver messages at roughly the same time to all nodes can help with conflict reduction in other protocols that suffer from this problem. EPaxos comes to mind right away, as it is affected by the “dueling leaders” problem as well. Actually, CPaxos and EPaxos are rather similar. Both assume low conflict rate to have single round trip “happy path” writes and reads. When the assumption breaks, and there is a conflict, both switch to two phases. EPaxos is better here in a sense that the first opportunistic phase is not totally wasted and can be used as phase-1 in the two phase mode, whereas CPaxos has to start all the way from the beginning due to the API limitation on the follower side.