All posts by alekseyc

Reading Group. Prescient Data Partitioning and Migration for Deterministic Database Systems

In the 75th reading group session, we discussed the transaction locality and dynamic data partitioning through the eyes of a recent OSDI’21 paper – “Don’t Look Back, Look into the Future: Prescient Data Partitioning and Migration for Deterministic Database Systems.” 

This interesting paper solves the transaction locality problem in distributed, sharded deterministic databases. The deterministic databases pre-order transactions so that they can execute without as much coordination after the pre-ordering step. Now, this does not mean that every transaction can run locally. Indeed, if a transaction needs to touch data across multiple partitions or shards, then there is a need to reach out across partitions, which introduces delays. A natural way to solve the problem is to migrate the data based on its access patterns to minimize the number of such cross-partition transactions.

Such dynamic data migration is pretty much the solution in Hermes, the system presented in the paper. The thing is that the “group the data together” approach is not new. The authors cite several papers, such as Clay that do very similar stuff. Now, the biggest difference between Hermes and other approaches is the decision-making process involved in figuring out how and where the data needs to go. Traditionally, dynamic re-partitioning solutions rely on the historical data from the workload. These approaches work great when the workload is decently stable. However, workloads that abruptly change their access patterns present a problem — the system based on historical observation of access locality is reactive and needs time to adjust to the new workload. So, naturally, to tolerate the rapidly changing workload locality characteristics, we need a proactive system that can predict these locality changes and make necessary changes ahead of time. It would be nice to have an oracle that can see into the future. Well, Hermes kind of does this. See, the system requires a batched database, and before executing each batch, it can look at the access patterns within the batch and adjust accordingly. 

Hermes looks at the patterns of transactions to be executed shortly, figures out how many transactions will require cross-partition coordination, and then makes data movements to minimize that number. For example, if some transaction type accesses two objects frequently together, Hermes will move one of these objects, incurring one cross-partition data transfer. If this happens, the transaction itself becomes partition-local, incurring no cross-partition data transfers.

In addition to moving and repartitioning the data around, Hermes also moves the transactions. The movement of compute tasks, again, is not new and makes a ton of sense. If a transaction originating in partition P1 needs some objects A & B located in some partition P2, then it makes a lot of sense to move a relatively small transaction to where the data is.

Unfortunately, the combination of dynamic re-partitioning and transaction movements leads to some unintended consequences over the long run. Consider a batch where objects A & B were used frequently together. The system moves them to one partition for speed. In the next batch, we may have objects B & C used together, so C moves to the same partition as B, again for speed. Now we have colocated objects A, B, and C. If this continues for very long, the system will consolidate more and more data in one place. This consolidation is not ideal for load balancing, so Hermes has to account for this and prevent data from gradually drifting closer together. It does so by “de-optimizing” for locality and allowing more distributed transactions in exchange for a more even load in each batch.

As far as performance, Hermes delivers when it comes to the workloads with frequent changes in locality:

As always, the presentation video from the reading group:

Discussion

1) Wide Area Networks. Hermes takes advantage of locality in a data center setting. It groups objects used together to allow local transactions as opposed to distributed ones. However, it may not work in a geo-distributed environment. The problem is that Hermes only solves one type of locality puzzle. It accounts for the grouping of objects used together in hopes that these objects will be used together again — spatial locality and used soon enough — temporal locality. In fact, Hermes optimizes based on spatial criteria. This notion of locality works well in LAN, where the transaction can run in a partition with the data without a performance penalty. However, in WAN, moving transactions between partitions is costly. If a transaction originated in region R1, but data is in region R2, then moving the transaction from R1 to R2 may incur almost as much latency as moving data from R2 to R1. That is a big difference between LAN and WAN — in the WAN setting, transactions incur a significant latency penalty when processed in another region. In geo-distributed setting, locality means more than just grouping frequently used objects together to rip the benefit shortly. In addition to grouping data based on spatial principles, we need to preserve the geographical affinity and place data close to where it is accessed. In other words, in geo-replication, we do not only care about finding cliques of related data, but also placing these cliques in the best possible geography for a given workload. And needless to say, grouping objects and finding the best geographical location for the data often conflict with each other, making the problem significantly more nuanced and complicated.

2) Workload. A significant motivation for the paper is the existence of workloads with significant and abrupt access pattern changes. The paper refers to Google workload traces for the example of such abrupt workloads. The authors also conduct a significant portion of the evaluation on the workload created from these traces. We are a bit skeptical, at least on the surface, about the validity of this motivation. One reason for skepticism is the traces themselves — they come from Google Borg, which is a cluster management system. While Borg is obviously supported by storage systems, the traces themselves are very far from describing some actual database/transactional workload. It would be nice to see a bit more details on how the authors created the workload from the traces and whether there are other examples of workloads with abrupt access pattern changes. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Viewstamped Replication Revisited

Our 74th paper was a foundational one — we looked at Viestamped Replication protocol through the lens of the “Viewstamped Replication Revisited” paper. Joran Dirk Greef presented the protocol along with bits of his engineering experience using the protocol in practice.

Viestamped Replication (VR) solves the problem of state machine replication in a crash fault tolerance setting with up to f nodes failing in the cluster of 2f+1 machines. In fact, the protocol (between its two versions, the original and revisited) is super similar to Raft and Multi-Paxos. Of course, the original VR paper was published before both of these other solutions. An important aspect in the narrative of the VR paper is that the basic protocol makes no assumptions about persisting any data to disk. 

Replication

In the common case when replicating data, VR has a primary node that prescribes the operations and their order to the followers. The primary needs a majority quorum of followers to ack each operation to consider it committed. Once it reaches the quorum, the primary can tell the followers to commit as well. In VR, followers do not accept out-of-order commands and will not ack an operation without receiving its predecessor. With a few differences, this is how typical consensus-based replication works in both Raft in Multi-Paxos. For example, Multi-Paxos complicates things a bit by allowing out-of-order commits (but not execution to the state machine).

View Change and Replicas Recovery

As with everything in distributed systems, the interesting parts begin when things start to crumble and fail. Like other protocols, VR can mask follower crashes, so the real challenge is handling primary failures. The VR protocol orchestrates the primary failover with a view-change procedure. Here I will focus on the “revisited” version of the view-change. 

One interesting piece of the revisited view-change is its deterministic nature, as each view is assigned/computed to belong to a particular node. When a current primary in view fails, the node for view v+1 will try to become a new primary. This promotion to primary can be started by any node suspecting the current primary’s demise. We do not need to have a node for view v+1 to directly observe the crash of primary for v, allowing for unreliable failure detectors. 

So, any replica can start a view change if it detects a faulty primary. The replica will advance to the next view v+1 and start the view change by sending the StartViewChange message with the new view number to all the nodes. Each replica that receives the StartViewChange will compare the view to its own, and if its view is less than the one in the received StartViewChange message, it will also start the view change and send its own StartViewChange message to every replica. Also note, that two or more replicas may independently start sending the initial StartViewChange messages for the new view v+1 if they independently detect primary failure. The bottom line of this StartViewChange communication pattern is that now any replica that received at least f StartViewChange messages will know that the majority of the cluster is on board to advance with the view change. 

So, after receiving f StartViewChange messages for the new view v+1 from other nodes, each replica sends a DoViewChange message to the primary for that new view v+1. The DoViewChange message contains the replica’s last known view v’ along with the log. The new primary must receive a majority (i.e., f+1) of DoViewChange messages. The new primary then selects the most up-to-date log (i.e., the one with the highest v’ or longest log if multiple DoViewChange messages have the same highest v’). This up-to-date log ensures the new primary can recover any operations that might have been majority-accepted or committed in previous views. Such “learning from the followers and recover” part of the view change procedure is somewhat similar to Multi-Paxos, as the new leader needs to recover any missing commands before proceeding with the new ones. The VR leader recovery only cares about the view and the length of the log because it does not allow out-of-order prepares/commits. In Multi-Paxos, with its out-of-order commitment approach, we kind of need to recover individual log entries instead of the log itself. 

Anyway, after receiving enough (f+1DoViewChange messages and learning the most up-to-date log, the new primary can start the new view by sending the StartView message containing the new view v+1 and the recovered log. The followers will treat any uncommitted entries in the recovered log as “prepares” and ack the primary. After all entries have been recovered, the protocol can transition into the regular mode and start replicating new operations.

Schematic depiction of a View Change procedure initiated by one node observing primary failure. Other machines Send StartViewChange as a response. Eventually, a majority of nodes receive fStartViewChange messages and proceed to DoViewChange, at which point the new primary recovers the log.

There is a bit more stuff going on in the view change procedure. For example, VR ensures the idempotence of client requests. If some client request times out, the client can resend the operation to the cluster, and the new leader will either perform the operation if it has not been done before or return the outcome if the operation was successful at the old primary. 

Another important part of the paper’s recovery and view change discussion involves practical optimizations. For example, recovering a crashed in-memory replica requires learning an entire log, which can be slow. The proposed solution involves asynchronously persisting log and checkpoint to disk to optimize the recovery process. Similarly, view change requires log exchange, which is not practical. An easy solution would involve sending a small log suffix instead since it is likely that the new primary is mostly up-to-date. 

Discussion

1) Comparison With Other Protocols. I think the comparison topic is where we spent most of the discussion time. It is easy to draw parallels between Multi-Paxos and VR. Similarly, Raft and VR share many things in common, including the similarity of the original VR’s leader election to that of Raft. 

One aspect of the discussion is whether Multi-Paxos, Raft, and VR are really the same algorithms with slightly different “default” implementations. We can make an argument that all these protocols are the same thing. They operate in two phases (the leader election and replication), require quorum intersection between the phases, and have the leader commit operation first before the followers. The differences are how they elect a leader to ensure that the committed or majority-accepted operations from the prior leader do not get lost. Multi-Paxos and VR revisited recover the leader, while Raft ensures the new leader is up to date to eliminate the need for recovery. 

2) Optimizations. Academia and Industry have churned out quite a substantial number of optimization for Multi-Paxos and Raft. There has even been a study on whether the optimizations are interchangeable across protocols (the short answer is yes). But that study did not include VR. Interestingly, Joran mentioned that they applied Protocol-Aware-Recovery to VR, and (if I am not mistaken) they also used flexible quorums. Of course, I am not surprised here, given how similar these solutions are. 

3) In-memory Protocol. Joran stressed the importance of in-memory protocol description to engineers. In their systems, the in-memory consensus allows reducing the reliance on fault-tolerance/reliability of the storage subsystem. It appears that VR is the only major SMR protocol specified without the need for a durable infallible disk. And while people in academia do a lot of in-memory consensuses (including Multi-Paxos and Raft), the aspects of what is needed to make these systems work without the disk are not clearly described. 

The VR paper briefly describes the problem that the lack of durable log introduces. The issue is that a node can forget its entire state. So, it can accept some value, create a majority and then reboot. Upon rebooting, if the node is allowed to participate right away, we no longer have the forgotten value in the majority quorum. Similarly, a node can vote on something in one view, reboot and forget and then potentially revote in a lesser view by some slow old primary. 

To avoid these problems, the node needs to recover before it can participate. At least a partial recovery is needed to avoid the double voting problem — the node needs to learn the current view before accepting new operations. At the same time, such a partially recovered node must still appear “crashed” for any older operations until it has fully recovered. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Polyjuice: High-Performance Transactions via Learned Concurrency Control

Our 73rd reading group meeting continued with discussions on transaction execution systems. This time we looked at the “Polyjuice: High-Performance Transactions via Learned Concurrency Control” OSDI’21 paper by Jiachen Wang, Ding Ding, Huan Wang, Conrad Christensen, Zhaoguo Wang, Haibo Chen, and Jinyang Li. 

This paper explores single-server transaction execution. In particular, it looks at concurrency control mechanisms and conjectures that the current approaches have significant limitations for different workloads. For instance, two-phase-locking (2PL) may work better in workloads that have high contention between transactions, while Optimistic Concurrency Control (OCC) works best in low-contention scenarios. A natural way to solve the problem is to create a hybrid solution that can switch between different concurrency control methods depending on the workload. A paper mentions a few such solutions but also states that they are too coarse-grained.

Polyjuice presents a different hybrid strategy that allows a more fine-grained, auto-tunable control over transaction concurrency control depending on the workload. The core idea is to extend the existing concurrency control mechanisms into a set of actions and train the system to take the best actions in response to each transaction type, transaction’s dependencies, and the current step within the transaction. 

Possible actions Polyjuice can take include different locking options, whether to allow dirty reads, and whether to expose dirty writes. 

For the sake of time, I will only talk about locking actions. Each transaction upon accessing some data must pick the concurrency control actions. For example, if some transaction “B” has another transaction “A” in its dependency list for some key, it needs to decide whether to wait or not for the dependent transaction “B.” In fact, “B” has a few options in Polyjuice: be entirely optimistic and not wait for “A” or to be more like 2PL and halt until the dependent “A” has finished. In addition to the two extremes, Polyjuice may tweak this lock to wait for partial execution of the dependent transaction. 

State & Action Space Policy Table. Example parameters for some transaction t1, step #2 are shown. Image from the authors’ presentation.

Polyjuice system represents the possible concurrency control actions through the state and action space policy table. In the table, each row is a policy for a particular step of a particular transaction. Each column is an action type. Wait actions are specific to dependents of a transaction, so if a transaction has a dependency, we will pick the wait action corresponding to the dependency. The cells then represent the action parameters, such as the wait duration or whether to read dirty. 

So, to operate the concurrency control actions, we need to maintain the dependency lists for transactions and tune the policy table for optimal parameters. The tuning is done with a reinforcement-learning-like approach, where we aim to optimize the policy (our table that maps state to actions) for maximizing the reward (i.e., the throughput) in a given environment (i.e., the workload). The actual optimization is done with an evolutionary algorithm, so it is a bit of a random process of trying different parameters and sticking with the ones that seem to improve the performance. 

It is important to note that the Polyjuice concurrency control policy is not ensuring the safety of transactions. It merely tries to tweak the waits and data visibility for best performance. So when each transaction finishes with all its accesses/steps, Polyjuice runs a final validation before committing. If the validation passes, then everything is ok, and if not, the transaction is aborted. This makes the entire concurrency control process optimistic, despite adding some waiting/locks for dependencies. In my mind, Polyjuice is a “loaded” optimistic concurrency control that tries to improve its chances of passing the validation and committing as quickly as possible. Kind of like a loaded die with a higher chance of getting the value you are betting on. 

Our presentation video is available below. Peter Travers volunteered for this paper about a day before the meeting, saving me from doing it. So big thanks to Peter, who did an awesome job presenting. 

Discussion

1) Single-server transactions. Polujuice is a single-server system, which makes some of its transactional aspects a lot simpler. For example, keeping track of transaction dependencies in a distributed system would likely involve some additional coordination. As we have seen in Meerkat, additional coordination is not a good thing. At the same time, there may be some interesting directions for learned CC in distributed space. For example, we can train a separate model or policy table for different coordinators in the systems. This can be handy when deploying the system across WAN with non-uniform distances between nodes or coordinators. I was originally excited about this possibility, and it feels like a natural direction for research.

2) Fixed transaction types. The system expects a fixed set of transaction types — transactions that have the same “code” and execute the same logic, but on different data. It is not entirely clear what will happen when a new transaction type arrives at the system before Polyjuice is retrained to include this new type. We would expect some sort of a default fall-back mechanism to be in place.

3) Evolutionary algorithm. So this is an interesting part. Polyjuice uses an evolutionary optimization algorithm to train its policy model. These types of algorithms try to mimic natural evolutionary processes to arrive at the more optimal solution. For example, the systems may start with an initial population of policies, then it needs to evaluate how these initial population performs, pick the two best policies, and somehow combine them. This combined policy (i.e., the offspring of the two best ones) can then replace the weakest performing policy in the population. The process can repeat. 

The paper actually does not use a crossover approach to produce offspring between the best policies. Polyjuice creates “children” by mutating the parameters of the good parents to produce the next population generation. It then evaluates this next generation, prunes the weak policies, and repeats the process. 

The paper claims this works well, but we were wondering about the convergence of this to an optimal solution. Can the evolutionary algorithm get stuck in some local maximum and not find the best policy? Another concern is the time to converge, and the training impact on the transactional performance. To find the best policies, Polyjuice needs to evaluate the entire population. While this is happening, the performance of the system may stutter and jump back and forth as it tries different policies, which is not ideal in production workloads. At the same time, we must use production workloads to train the policy. And of course, the process requires multiple iterations. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle

Our 72nd paper was on avoiding coordination as much as possible. We looked at the “Meerkat: Multicore-Scalable Replicated Transactions Following the Zero-Coordination Principle” EuroSys’20 paper by Adriana Szekeres, Michael Whittaker, Jialin Li, Naveen Kr. Sharma, Arvind Krishnamurthy, Dan R. K. Ports, Irene Zhang. As the name suggests, this paper discusses coordination-free distributed transaction execution. In short, the idea is simple — if two transactions do not conflict, then we need to execute them without any kind of coordination. And the authors really mean it when they say “any kind.”  

In distributed transaction processing, we often think about avoiding excessive contention/coordination between distributed components. If two transactions are independent, we want to run them concurrently without any locks. Some systems, such as Calvin, require coordination between all transactions by relying on an ordering service to avoid expensive locks. Meerkat’s approach is way more optimistic and tries to avoid any coordination, including ordering coordination. Meerkat is based on Optimistic Concurrency Control (OCC) with timestamp ordering to enable independent transactions to run without locks and ordering services. Another innovation for avoiding coordination is the replication of transactions from clients straight to servers. Instead of relying on some centralized replication scheme, like Multi-Paxos or Raft, Meerkat lets clients directly write transactions to replicas, avoiding replication coordination and leader bottlenecks. The authors explored this “unordered replication” idea in their previous paper. The clients also act as the transaction coordinators in the common case. 

Meerkat does not stop its coordination avoidance efforts here at the cross-replica coordination. In fact, this is where the most interesting magic starts to happen — the authors designed a good chunk of a system specifically to avoid the cross-core coordination within each server to take advantage of modern multi-core CPUs. The authors call such avoidance of cross-core and cross-replica coordination a Zero Coordination Principle or ZCP for short.

As a motivating example for ZCP and the need for cross-core coordination avoidance, the paper illustrates the contention created by a simple counter shared between threads on one machine. It appears that with the help of modern technologies to alleviate networking bottlenecks (kernel bypass), such a shared counter becomes an issue at just 8 threads. In the example, a simple datastore with a shared counter could not scale past 16 threads, while a similar store without a shared resource had no such problems. 

Let’s talk about the protocol now to see how all the coordination-avoidance efforts actually work. The system tolerates \(f\) failures in the cluster of \(2f+1\) machines. Each transaction can read and write some set of objects supported by the underlying key-value store. These objects represent the transaction’s read- and write-sets. The replicas maintain two data structures to support transaction processing: a trecord and a vstore. 

The trecord is a table containing all transaction information partitioned by the CPU core ID to make each transaction “sticky” to a single core. It manages the transaction state, such as the read- and write-sets, transaction version timestamp, and commit status. The vstore stores versioned key-value pairs. Unlike the trecord, the vstore is shared among all cores at the server. The transaction protocol runs in 3 distinct phases: Execute, Verify, Write. I’m not sure these are the most intuitive names for the phases, but that will do.

In phase-1 (execute), the transaction coordinator (i.e., a client) contacts any replica and reads the keys in its read-set. The replica return versioned values for each key. The coordinator then buffers any pending writes.

The phase-2 (validation) combines the transaction commit protocol with replication of transaction outcome. The coordinator starts phase-2 by first selecting a sticky CPU core that will process the transaction. The sticky core ID ties each transaction to a particular CPU core to reduce inter-node coordination. The coordinator then creates a unique transaction id and a unique timestamp version to use for OCC checks. Finally, the coordinator sends all this transaction information to every replica in a validate message. 

Upon receiving the message, each replica creates an entry in its trecord. The entry maintains the transaction’s state and makes this transaction “stick” to the core associated with the core-partitioned trecord. At this point, the replica can validate the transaction using OCC. I will leave the details to the paper, but this is a somewhat standard OCC check. It ensures that both the data read in phase-1 is still current and the data in the write-set has not been replaced yet by a newer transaction. At the end of the check, the replica replies to the coordinator with its local state (OK or ABORT). 

The coordinator waits for a supermajority (\(f+\lceil\frac{f}{2}\rceil+1\)) fast-quorum of replies. If it receives enough matching replies, then the transaction can finish right away in the fast path. If the supermajority was in the “OK” state, the transaction commits, and otherwise, it aborts.

Sometimes a supermajority fast-quorum does not exist or does not have matching states, forcing the coordinator into a slow path. In a slow path, the coordinator only needs a majority of replicas to actually reply. If the majority has replied with an “OK” state, the coordinator can prescribe the replicas to accept the transaction, and otherwise, it prescribes the abort action. Once the replicas receive the prescribed transaction state, they mark the transaction accordingly in their trecord and reply to the coordinator. Here, the coordinator again waits for a quorum before finalizing the transaction to commit or abort. 

Finally, in phase-3 (write), replicas mark the transaction as committed or aborted. If the transaction is committed, then each replica can apply the writes against the versioned datastore. 

Phew, there is a lot to unpack here before diving deeper into the corner cases and things like replica and coordinator failures and recovery. The important parts relate to how the coordination is handled/avoided. To start, the coordinator uses a timestamp for the version, circumventing the need for a counter or centralized sequencer. The transaction replicates with its timestamp directly by the coordinator (who happens to be the client in the normal case) to the replicas, avoiding the need for a centralized replication leader or primary. At the server level, each transaction never changes its execution core even as it goes through different phases. All messages get routed to the core assigned to the transaction, and that core has unique access to the transaction’s trecord partition. This “core stickiness” avoids coordination between the cores of a server(!) for the same transaction. I speculate here a bit, but this may also be good for cache use, especially if designing individual transaction records to fit in a cache line. As a result, the only place the coordination happens between transactions is the OCC validation. During the validation, we must fetch current versions of objects from the core-shared vstore, creating the possibility for cross-core contention between transactions accessing the same data.

I do not want to go too deep into the failure recovery; however, there are a few important points to mention. The replica recovery process assumes replicas rejoin with no prior state, so they are in-memory replicas. The recovery process is leader-full, so we are coordinating a lot here. And finally, the recovery leader halts transaction processing in the cluster. As a result, the recovery of one replica blocks the entire system as it needs to reconcile one global state of the trecord that can be pushed to all replicas in a new epoch. I will leave the details of the recovery procedure to the paper. However, intuitively, this over-coordination in recovery is needed because the normal operation avoided the coordination in the first place. For example, state machine replication protocols “pre-coordinate” the order of all operations. When a replica needs to recover, it can learn the current term to avoid double voting before simply grabbing all committed items from the log available at other nodes and replaying them. It can replay the recovered log while also receiving new updates in the proper order. In Meerkat, we have no single history or log to recover, so a pause to reconcile a consistent state may be needed.  

The coordinator recovery is handled by keeping backup coordinators and using Paxos-like consensus protocol to ensure only one coordinator is active at the same time and that the active coordinator is in a proper state. 

Now we can talk a bit about the performance. Meerkat significantly outperforms TAPIR, which is the previous system from the same group. The performance gap between the systems is huge. This begs a natural question about the performance. Just how much of Meerkat’s gain is due to a super-optimized implementation and utilization of techniques like kernel-bypass? Meerkat-PB in the figure can shed some light on this, as it represents a version of Meerkat with a dedicated primary for clients to submit transactions. Having a primary adds cross-replica coordination, and despite that, it still significantly outperforms the older systems.


As always, we had a presentation in the group, and it is available in our YouTube channel. This time, Akash Mishra did the presentation:

Discussion

Quite frankly, I have incorporated quite a bit of discussion into the summary already. 

1) Performance. One of the bigger questions was about performance. How much of the “raw” speed is due to the ZCP, and how much of it is due to the enormous implementation expertise and use of kernel bypass and fancy NIC to deliver messages to proper cores/threads. We speculate that a lot of the overall performance is due to these other improvements and not ZCP. That being said, once you have an implementation this efficient/fast, even tiny bits of coordination start to hurt substantially, as evidenced by the motivation examples and even the primary-backup version of Meerkat. 

2) Replica Recovery. The blocking nature of recovery may present a real problem in production systems, especially if the recovery time is substantial. It would have been nice to see the recovery evaluations.

3) Performance States. To continue with performance/recovery topics, it appears as the system can operate in multiple very distinct performance states. In the fast-quorum operation, commits come quickly. In the slow path, a whole new round-trip exchange is added (probably after the timeout). This creates distinct latency profiles for fast and slow paths. This can also create distinct throughput profiles, as a slow path sends and receives more messages, potentially creating more load in the system. 

4) Sharded systems? Many systems use sharding to isolate coordination into smaller buckets of nodes or replica-sets. For example, Spanner and Cockroach DB do that. Such sharding allows independent transactions to run in separate “coordination pods” without interfering with each other. To scale these systems just need to create more such coordination pods. Of course, in systems like Spanner, transactions that span multiple replica-sets add another level of coordination on top, but the chance that any two shards need to coordinate is kept low by making lots of tiny shards. I wonder about the differences between two philosophies — avoiding coordination vs. embracing it, but in small groups. Are there benefits to “coordination pods”? Should we embrace ZCP? Can ZCP survive the scale of these larger sharded systems?

5) The need for supermajority quorum? Supermajority fast quorums often raise many questions. Just exactly why do we need them? The short answer here is fault-tolerance, or more specifically the ability to recover operations after failures. See, in majority quorum protocols that have a leader, we have at most one operation that can be attempted in the given epoch and log position. This means that if some replicas fail, we can recover the operation if we can guarantee to see the value in at least one replica. Any two majority quorums intersect in at least one replica, making the single operation recoverable as long as it has made it to the majority. Unfortunately, this does not work with leaderless solutions as illustrated by Fast Paxos, as many different values can be attempted in the same epoch and slot position. However, we still need to survive the failures and recover. 

Let’s look at the example to illustrate this. Assume we have 5 nodes, 3 of which have accepted value “A” and 2 have value “B.” Let’s assume that we commit “A” at this time since we clearly have a majority agreeing on “A.” If 2 “A” nodes crash, we will have 3 live nodes remaining: “A, B, B.” By looking at these nodes we do not know the fact that value “A” might have been committed by the coordinator. We need a supermajority quorum to survive the failures and recover. If we commit with a supermajority of size \(f+\lceil\frac{f}{2}\rceil+1\) (4 out of 5 nodes) “A, A, A, A, B”, then failing 2 “A” nodes leaves us with 3 nodes: “A, A, B.” We see more “A” operations here and can recover “A.” In fact, if some value is in a supermajority, then any majority quorum will have the majority of its nodes (i.e. \(\lceil\frac{f}{2}\rceil+1\) nodes) having that value (but not the vice-versa — the majority value of a majority quorum does not mean the value was in the supermajority).

Now, how does this relate to Meerkat? Each Meerkat transaction has a unique id, so one may think we never have the possibility of committing two or more different transactions for the same id. However, we have to be careful about what Meerkat replicas need to agree upon. It is not the transaction itself, but a transaction status — OK or ABORT. So, we do have two possible values that can exist at different replicas for the same transaction. As a result, Meerkat needs a fast path supermajority quorum to make the transaction status decision recoverable in the replica recovery protocol.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

How to Read Computer Science (Systems) Papers using Shampoo Algorithm

I think most academics had to answer a question on how to approach papers. It is the beginning of the semester and a new academic year, and I have heard this question quite a lot in the past two weeks. Interestingly enough, I believe that almost every academic active on the Internet has written about paper reading. So one may think there should be plenty of guidance out there, all within Google’s reach. And I see students reading these tips and suggestions and still coming back for more advice.

One problem with many paper reading tips is that they are too algorithmic and do not help where people really struggle. “Read the title, then abstract, then go to the conclusion, look at the figures…” is a common advice pattern. Since when do these steps actually help understand the paper? Who can look at protocol drawings or results figures after reading an abstract and get it? No doubt, this approach works for some, but it seems to require a decent paper reading skill and huge baggage of consumed literature in related topics. Other suggestions I see online may be a lot more helpful but too come with some bias towards more experienced readers.

My approach to reading papers and advice I give my students is based on the infamous shampoo algorithm — lather, rinse and repeat. Scientific (computer science) papers are not like most other types of reading we do. Unlike most fiction, for example, scientific papers are non-linear in their comprehension. Let me explain what I mean here. When we read fictional stories, we follow a linear path through the plot. On the first pages of a detective story, we will never see who was the murderer and how the victim was killed, only to see the motive in the next chapter and a detailed description of a weapon somewhere in the middle of the book. Yet this is exactly how computer science papers unfold their story.

Rings of Comprehension

This non-linearity means that we cannot apply linear reading strategies to papers. We cannot just go with the text to build a complete understanding of the story. Most academic papers have circular “plot” structures with multiple rings of comprehension built around each other. Each of these “rings” acts as a foundation for what is to come next. We can think of them as the same concepts explained in increasing levels of difficulty. This rings abstraction is where the shampoo algorithm comes in — we must “rinse and repeat” until we reach a good understanding of something at one level of difficulty before moving to the next one.

Naturally, we can start with a title and an abstract. These build our initial comprehension ring of the paper, as we establish the broad topic and very high-level information about the solution and the outcome. Hopefully, things are clear at this point in the reading process, but if they are not, it may be a good idea to pick up a textbook and resolve the confusion. 

The introduction usually brings us to the next comprehension ring. We learn the problem and its importance in greater detail, get motivated. The intro will likely provide some insight into the solution as well. At this time, we may also look at the conclusion to see if anything is interesting there. Some conclusions can be very good at providing a high-level overview, some are not so much. We are at a crucial point in understanding the paper now, and things tend to get way harder very quickly after this. If there is some fundamental gap in comprehension by this time, it would be a good idea to go back and reread. Maybe follow the references on aspects that are still hard to grasp — they will only get more difficult later on.

I should take a pause here for a bit. See, it is ok to not know something at each comprehension ring before moving forward. It is ok to ask questions about how things are done and seek answers in the following outer rings. Moreover, you must ask how and why questions and seek answers later. But you probably should not have fundamental questions about what the authors are doing or questions about the background discussed. I think there is a rather fine line between the questions you need to be asking going forward and the questions that require the “rinse and repeat” treatment. Seeing this line may come with a bit of experience. The good things about the shampoo algorithm is that when you are confused, you can always go back and repeat.

Now we are ready to move forward. If the paper has a background section and you had questions about background earlier, then read it. In my areas of expertise, I may often skim through the background instead of reading carefully.

We are now diving into the “meat of the paper” or “the solution” sections. This is the most challenging and time-consuming part that may actually have multiple comprehension rings packed together. It is important to never be afraid to practice the “shampoo algorithm” when things start to become gibberish. Go back even if this requires rewinding to the previous comprehension ring.

At first, we may approach the solution sections by reading the English description accompanied by any graphical explanations in the figures. At this stage, it may be a good idea to skip any algorithms and proofs. Instead, we must focus on absorbing the details of the solution and try to answer the how/why questions we had previously. The detailed English and graphical descriptions form an entire comprehension ring after the introduction/background one. Working on this third comprehension ring often requires going back and rereading paragraphs and sections many times over to build proper understanding. Now is also a good time to start looking at the evaluation and see if the results match any expectations we may have had based on our understanding so far.

Finally, we can move into the algorithms and proofs and try to work them out. Working through these things requires a good understanding of what is going on in the solution. We have worked hard to have this understanding by reading and rereading, asking and answering questions. Building the required understanding/intuition of the problem and solution is the reason for approaching these so late in the process. However, going through algorithms on paper line by line is super useful and helps transform intuition into a more codified working model. This codified model forms yet another comprehension rings. An additional ring will form after going through the proofs. 

Writing as a Reading Tool

While I talked a lot about rinse and repeat, not being afraid to go back into the paper, and gradually build the comprehension rings, I did not mention one super useful tool for reading papers. This tool is writing. See, our brains are kind of too fast when we read, and this speed is not a good one. The brain races between questions, answers, remembering past information, and acquiring the new one. The “go back and reread” approach allows the brain to slow down a bit from getting carried away too far and too fast. However, the ultimate brain speed bump is writing — we type much slower and have to focus on what we write, forcing the brain to slow down even more. And this is where the magic happens. It turns out that when the brain is working slower, the thought process is also more structured. For this reason, pilots are trained to slow their brains down in critical conditions to think better. We are not in any life-or-death critical situation, so simple writing may be a good enough tool to slow your brain down, so you can have time to think and explain what you just have learned. 

I often use writing as my ultimate reading tool. For example, all of my reading group summaries are a result of the “ring-3” comprehension discovery process. I do not write for all papers I read, but when I do, it significantly improves my understanding of the paper.  

Reading Group. DistAI: Data-Driven Automated Invariant Learning for Distributed Protocols

In the 71st DistSys reading group meeting, we have discussed “DistAI: Data-Driven Automated Invariant Learning for Distributed Protocols” OSDI’21 paper. Despite the misleading title, this paper has nothing to do with AI or Machine Learning. Instead, it focuses on the automated search for invariants in distributed algorithms. I will be brief and a bit hand-wavy in this summary, mainly because I am too busy with other things to take care of (thanks, the beginning of the semester!), and this paper requires rather careful attention and a deep read to fully grasp it. 

Before going into the paper itself, I want to talk about invariants. We use invariants a lot when thinking about the safety of algorithms and protocols. So an invariant is a property that must hold throughout the execution of an algorithm. It holds initially and after every discrete step of the protocols. For people like me, finding a good safety property is usually enough when checking some algorithm written in a spec language like TLA+. A TLC model checker runs an exhaustive search of the state space and ensures the safety property after every action. If the invariant condition is violated, TLC will produce a counterexample. There is a bit of a problem with this approach — for big algorithms, checking the model can take hours, days, or even weeks. TLA+/TLC is a bruteforce solution.  

IVy, the system used as a component in today’s paper, takes a different approach. Instead of brute-forcing and checking all possible executions (and of course, the execution may be infinite too!), it tries to be clever and prove an algorithm correct by induction. The idea is simple on the surface — take an inductive invariant I prescribing the safety of an algorithm, and show that I holds initially and after all possible transitions starting from any state that satisfied the invariant I. In other words, we need to check that every possible transition from state S satisfying invariant results in a state S’ that still satisfies I. Unfortunately, in practice, this is where things get complicated. The problem is that our safety property I from the model-checking world may not be inductive. More specifically, if we are in some non-initial state S that satisfies I, there is still a possibility that after at action S->S’S’ will not satisfy I. This situation may arise not because the safety property is incorrect but because the state is an impossible or unreachable state that still somehow satisfies I. The solution is then to strengthen the invariant to some new invariant I’ that prevents the unreachable starting state for our possible transitions. IVy system provides tools to do this strengthening semi-automatically with some user help.

The DistAI paper builds on IVy and improves and optimizes the invariant search process to make it faster, more automatic, and more accurate. The process starts similarly to IVy’s, as a user provides the IVy spec with all possible actions, safety properties that must hold, and relations between variables. These relations between variables are important since they specify the properties of the algorithm. For example, a system may have a relation specifying whether a node holds a lock or whether the two nodes have communicated with each other. The system will use these relations to find the inductive invariants. The inductive invariant will be used in conjunction with safety property to finally check the protocol.  

The rest of the DistAI’s invariant search process, however, is largely automatic. The process is iterative, with each iteration starting with some input to produce invariant candidates that involve up to some number of variables and no longer than some number of disjunctive terms. For instance, if all variable relations involve up to 2 variables, it may make sense to, initially, restrict the invariant search to just two variables. 

Interestingly, at this step, DistAI brings back some of the model-checking magic into the process. It produces a few sample trace executions of the protocol and uses these traces to create a list of all observed variable relations after taking each action. In practice, this list of relations can be large if the simulation trace has many variables, so the system will randomly narrow down its focus to only look at a subsample of the execution trace with just a few variables at a time. For example, if all relations have no more than two variables, we may look at how some combination of two variables changed during the execution trace. This two-variable sample will follow a template with two quantified variables: \(\{\forall V_1, V_2\}\). The DistAI system produces a handful of these subsamples for each quantified template. 

After having these subsamples/sub-traces ready, DistAI enumerates the relations from subsamples. It can easily see which relations were holding throughout the entire execution trace. These relations can be generalized from having specific variable values in the samples to quantified variable placeholders from the sample’s template. For example, if a relation some_property holds in all subsamples, then we can generalize based on the template to \(\forall V_1, V_2: some\_property(V_1, V_2)\). More than one relation may hold true in the subsamples, and so DistAI will produce multiple of these generalized relations. For example, some other relation \(some\_other\_property(V_1)\) may have been true throughout all the subsamples. We will use all these relations to produce all different disjunctive invariant candidates, such as
(1): \(\forall V_1, V_2: some\_property(V_1, V_2)\),
(2): \(\forall V_1, V_2: some\_property(V_1, V_2) \lor some\_other\_property(V_1)\)
and other permutations. Also, note that candidate (1) implies candidate (2), i.e., whenever (1) is true, (2) is also always true. In reality, the DistAI candidate enumeration process is a bit more complicated, as different templates may result in duplicate/overlapped invariant candidates.

At this point, DistAI will use IVy to check the candidate invariants. So since we have obtained the candidates from an execution trace, we already know that the candidates hold in at least some executions, and so we hope that we have had filtered most of the incorrect invariants. DistAI will start the check with the shortest candidate first. In the hypothetical example, candidate (1) will be checked first, since if it is inductive, then there is no need to check candidate (2) that is always true when (1) is satisfied. If the shorter invariant candidate fails IVy check, then DistAI will try to refine it by iterating to a longer version or a version with more variables. In this case, candidate (2) is a refined candidate that has one extra disjunctive term. If DistAI cannot find a successful candidate in the list created during the enumeration process, then it can start from the beginning by running new sample traces with more variables and increased the maximum length of the disjunctive invariant formula.

Phew, this is all very complicated. In short, the idea is to get some invariant candidates from the simulation trace and then check these candidates in IVy, starting with the shortest/simplest candidate and gradually working the way up to more complex invariants. Each of the more complicated candidates you check adds a bit of disjunctive stuff to exclude the issues the previous less-refined candidate stumbled upon. 

As always, we had a video:

Discussion.

Quite frankly, I no longer remember the full extent of the discussion we had. I remember it was quite a lot of talking. 

1) Inductive invariants. We spent a bit of time on the difference between inductive invariants and just an invariant. See, we have TLA people in the group, but not that many experts in formal methods. As I mentioned in the beginning, TLA is happy with a short non-inductive safety invariant because it is a brute-force tool that can only check finite protocols. 

2) Candidate refinement. It seems like most of the refinement is based on going through the list of candidates from the enumeration step and trying them from simple to more complicated once. Most of the refinement is done by adding more disjunctive predicates until the maximum number of disjunctive terms have been reached (at which point we can try with more terms?).

3) Limitations. The templates for sampling require universal quantifiers. The authors claim that they could not check Paxos protocol because it requires existential quantifiers. However, it was pointed out that another related work (SWISS) managed to find invariants for Paxos, so it seems their approach does not have this limitation. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

One Page Summary. Photons: Lambdas on a diet

Recently, to prepare for a class I teach this semester, I went through the “Photons: Lambdas on a diet” SoCC’20 paper by Vojislav Dukic, Rodrigo Bruno, Ankit Singla, Gustavo Alonso. This is a very well-written paper with a ton of educational value for people like me who are only vaguely familiar with serverless space!

The authors try to solve some deficiencies in serverless runtime. See, each function invocation is isolated from all other functions by running in separate containers. Such an isolated approach has lots of benefits, ranging from security to resource isolation. But there are downsides as well — each invocation of the same function needs its own container to run, so if there are no available idle containers from previous runs, a new one needs to be created, leading to cold starts. A “cold” function takes more time to run, as it needs to be deployed in a new container and complied/loaded up to the execution environment. In the case of many popular JIT-compiled languages (i.e., Java), this also means that it initially runs from byte-code in an interpreted mode without a bunch of clever compile-time optimizations. The paper states another problem to having all functions invocations requiring their separate containers — the resource wastage. In particular, if we run many invocations of the same function concurrently, we are likely to waste memory on loading up all the dependencies/libraries separately in each container. The authors also mention that some function invocations, for instance, functions for some ML or data processing workloads, may operate from the same initial dataset. Obviously, this dataset must be loaded to each function container separately.

The paper proposes a solution, called Photons, to address the cold-start issues and resource wastage in workloads with many concurrent invocations of the same functions. The concept of a Photon describes a function executed in a container shared with other function invocations of the same type. The premise here is to forgo some isolation and allow multiple invocations of the same function to share the container. As a result, the number of cold starts can be reduced, and resources, like RAM, can be better utilized by having libraries and common data loaded only once for many concurrent function calls.

The lack of isolation between the function invocations, however, creates a few problems. The paper solves some of them, but it also passes quite a few of these problems off to the function developers. One big problem passed to the developers is ensuring that different invocations of the same function consume a predictable amount of resources. This is needed for a couple of reasons. First, there are scheduling needs, as the system needs to predict machine and container resource usage to determine which containers can be expanded to take on more function calls or which machines can handle new containers. Second, the lack of container-level isolation means that a misbehaved function instance can grab too many resources and starve other concurrent functions in the same container.

Another big problem is memory isolation and memory sharing between concurrent functions invocations. A significant part of the Photons platform deals with these problems. On memory isolation, things are easy when we only work with class variables. Each object created from the class will have a separate copy. However, some class variables may be static, and what is even worse, they can be mutable, allowing concurrent executions of the code to modify these static fields. Photons address this problem by transforming static variables into a map, where a key is a photonID (i.e., a unique invocation id of a function). Naturally, all reads and writes to a static variable change to map puts and gets. There are a bit more nuances with statics, and the paper covers them in greater detail. For sharing state, Photons runtime maintains a KV-store exposed to all Photons/function invocation in the container. One major downside of having a KV-store is managing concurrent access to it, and the systems leave it up to the developers to code coordination to this shared store. Aside from shared KV-store, functions also share the file system for temporary storage.

Reading Group. In Reference to RPC: It’s Time to Add Distributed Memory

Our 70th meeting covered the “In Reference to RPC: It’s Time to Add Distributed Memory” paper by Stephanie Wang, Benjamin Hindman, and Ion Stoica. This paper proposes some improvements to remote procedure call (RPC) frameworks. In current RPC implementations, the frameworks pass parameters to function by value. The same happens to the function return values. Passing data by value through copying works terrific for small parameters and returns. Furthermore, since the caller and callee have independent copies of the data, there are no side effects when one copy is modified or destroyed. Things start to become less than ideal when the data is large and gets reused in multiple RPC calls. 

For example, if a return value of one function needs to propagate to a handful of consecutive calls, we can end up performing many wasteful data copies. The wastefulness occurs as data transfers back to the caller and later gets copied multiple times to new RPCs. The obvious solution is to avoid unnecessary copying and pass the data within the RPC framework by reference using shared memory. In fact, according to the paper, many applications resort to doing just this with distributed key-value stores. Before invoking a remote function, a caller can save the parameters to the shared KV-store, and pass the set of keys as parameters to the RPC. Managing memory at the application level, however, has some problems. For example, the application must manage memory reclamation and cleanup objects no longer needed. Another concern is breaking the semantics of RPCs, as shared mutable memory can have side effects between different components of the system. 

Instead of an ad-hoc application-level solution, the paper proposes a managed approach to shared memory in RPCs. The improved framework preserves the original RPC semantics while enabling shared memory space and facilitating automatic memory management and reclamation. The paper suggests having immutable shared memory where values remain read-only after an initial write. This shared memory system will have a centralized scheduler for RPCs and a memory manager to keep track of memory usage and active memory references. With a centralized memory management approach, the RPC framework can do a few cool things. For instance, the memory manager knows when a reference is passed between nodes and when it goes out of scope on all functions/clients. This global memory knowledge allows the memory manager to perform garbage collection on values no longer needed. The paper describes this in a bit more detail, including the APIs to facilitate global memory tracking. Another benefit of the RPC framework with a centralized scheduler and memory manager is locality awareness. With information about data location in the cluster, the system can collocate function invocation on the machines that already have the values needed. 

The paper mentions that the proposed vision is already somewhat used in many specialized systems, such as Ray (Ray is from the same team as the paper) or Distributed TensorFlow. However, these systems are far from general and often used as parts of larger software stacks. The challenge with this new improved RPC framework is to make it general enough to span across the systems to allow better integration and more efficient resource use. I think of this as an RPC-as-a-service kind of vision that facilitates efficiency, interoperability and provides some blanket guarantees on communication reliability. 

As always, the paper has a lot more details than I can cover in the summary. We also have the presentation video from the reading group:

Discussion

1) Other Distributed Systems Frameworks. Frameworks for building distributed systems seem to gain more and more popularity these days. We were wondering how these frameworks address some of the same issues, such as copying data between components and managing locality. This is especially relevant for actor-style frameworks. We had some feedback on the Orleans framework. Since locality is important for performance, frameworks tend to move actors to the data. It often seems to be cheaper to move a small executable component than large chunks of data. More specialized data processing systems have been using these tricks for some time and can move both data and compute tasks around for optimal performance. 

2) Fault Tolerance. Fault tolerance is always a big question when it comes to backbone distributed systems like this one. With stateful components and shared memory subsystems, the improved RPC framework may be more susceptible to failures. While well-known and traditional techniques can be used to protect the state, such as replicating the shared memory, these will make the system more complicated and may have performance implications. Some components, like scheduler and memory manager, are on the critical path of all function invocations. Having strongly consistent replication there may not be the best choice for scalability and performance. The other option is to expose errors to the application and let them deal with the issues like in more traditional RPC scenarios or lower-level communication approaches. However, due to the size and complexity of the improved system, it would have been nice to have some failure masking. We were lucky to ask Stephanie Wang, the author of the paper, about fault tolerance, and she suggests RPC clients handle errors by reties, however, she also mentioned that it is still an open question about how much overall fault tolerance and fault masking is needed in a system like this one. 

3) Programming Model vs. Implementation. Another important discussion question was on the vision itself. The paper mentions a few specialized systems that already have many of the features described. This made us wonder what is the paper proposes then? and what are the challenges? It appeared to us that the paper’s vision is for a general and interoperable RPC system to act as a glue for larger software stacks. Stephanie provided some of her first-hand feedback here that I will quote directly:

“In my opinion, Ray already has (most of) the right features for the programming model but not the implementation. One of the main implementation areas that the paper discusses is distributed memory management. Actually, I don’t think that any of the systems mentioned, including Ray, do this super well today. For example, I would say that Distributed TensorFlow is quite sophisticated in optimizing memory usage, but it’s not as flexible as pure RPC because it requires a static graph. The second thing that I think is still missing is interoperability, as you said. I believe that this will require further innovations in both programming model and implementation.”

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Fault-Tolerant Replication with Pull-Based Consensus in MongoDB

In the last reading group meeting, we discussed MongoDB‘s replication protocol, as described in the “Fault-Tolerant Replication with Pull-Based Consensus in MongoDB” NSDI’21 paper. Our reading group has a few regular members from MongoDB, and this time around, Siyuan Zhou, one of the paper authors, attended the discussion, so we had a perfect opportunity to ask questions and get answers from people directly involved.

Historically, MongoDB had a primary backup replication scheme, which worked fine most of the time, but had a few unresolved corner-cases, such as having more than one active primary at a time due to the failures and primary churn. Having a split-brain in systems is never a good idea, so engineers looked for a better solution. Luckily, there are many solutions for this problem — an entire family of consensus-based replication protocols, like Multi-Paxos and Raft. MongoDB adopted the latter protocol to drive MongoDB’s update replication scheme. 

At this time, I might have finished the summary, congratulated MongoDB on using Raft, and moved on. After all, Raft is a very well-known protocol, and using it poses very few surprises. However, Mongo DB had a couple of twists in their requirements that prevent using vanilla Raft. See, the original MongoDB primary backup scheme had a pull-based replication approach. Usually, the primary node (i.e., the leader) is active and directly prescribes operations/commands to the followers. This active-leader replication happens in traditional primary backup, in Multi-Paxos, Raft, and pretty much any other replication protocol aside from pub-sub systems that rely on subscribers pulling the data. In MongoDB, the primary is passive, and followers must request the data at some repeated intervals. In the figure below, I illustrate the difference between the push-based (active leader) and pull-based (active follower) approaches.  

In both cases, the followers can learn new data from the leader. The pull-based approach, however, requires a bit more work. The leader has to compute the data batch for each follower individually, because followers may all have slightly different progress due to the differences in polling schedule. Also, there is another problem with the pull solution: the primary has no way of knowing whether the followers have received the operations during the pull. And this is important since the primary needs to count replicas accepting each operation to figure out the quorum and ultimately commit the operations. In traditional Raft, the result of AppendEntry RPC carries an ack for a successful operation, allowing the primary to count successful nodes and figure out when the quorum is satisfied. With flipped pull-style communication, there is no reply to the primary, and so we need to add some additional mechanism to propagate this information, as shown in the figure here. Adding more communication is costly, but the extra message can be piggybacked into the consecutive pull request at the expense of latency.

Pull-based Raft. Note that the leader must know when a quorum has accepted an operation.

Now let’s diverge a bit and discuss why MongoDB picked a pull-based approach despite added complexity/cost. The traditional Pull-based solutions are centered around the primary or leader node, creating a star topology. In fact, unless you use something like Pig primitive from our PigPaxos paper, star topology is pretty much all you can have. This is not very good in some scenarios where links may be congested or cost a lot of money. Consider a WAN scenario where two nodes may exist in another region for geo-redundancy to minimize data loss in the event of regional failure. With star topology, the same payload will travel across WAN twice to each of the two replicas, consequently costing twice as much to replicate. With a pull-based approach, it is easier to orchestrate other replication topologies. All we have to do is to instruct one node to pull data from its regional peer instead of the leader across WAN. Naturally, this creates a longer replication chain but saves the WAN link bandwidth, cost, and potentially some resources at the primary.

MongoDB’s pull-based solution enables such chaining by allowing some non-primary nodes to act as sync sources. Sync source is a node that provides data to syncing servers in response to a pull request. Of course, the primary is always a sync source. The high-level overview of node interactions is a follow:

  • Syncing server asks for new log entries from its sync source.
  • Sync source replies with a sequence of log entries.
  • Syncing server appends the log items if items follow the same history. 
  • If source and server logs diverge, the syncing server may have uncommitted data and must clean the divergent suffix. The detection of log divergence is based on matching the last log entry of syncing server with the first entry of the log segment sent by the source. If they match, then the source has sent a continuation of the same history.
  • Syncing server notifies its sync source of how up-to-date it is, including syncing server’s term.

Unlike vanilla raft, the syncing server does not check the term of the source, so it may be possible for a server to accept log entries from a source in some previous term. Of course, this can only happen if the log entries follow the same history. I think MongoDB Raft does so to make sure that the syncing server learns about legit updates from the previous leader even if it has already participated in the new leader’s election round. What is important here, is that the syncing server sends its higher term back to the source (which should propagate to the source’s source, etc, until it reaches the leader for the source’s term). These term messages act as a rejection for the old leader, so it won’t count the syncing server as accepting the message and being a part of the quorum. As a result, if the data from the old-termed sync source was committed, the syncing server has received it and will eventually receive the commit notification from the new leader. If that data is uncommitted by the old leader (i.e., a split-brain situation), then no harm is done since the syncing server does not contribute to the quorum. The syncing server will eventually learn of proper operations instead.

Now speaking of performance, the paper does not provide any comparison between push- and pull-based solutions, so we are left wondering about the impact of such drastic change. However, some experiments illustrate the comparison of star topology and chained replication in a 2-regions WAN setup. While chaining does not seem to increase the performance (unless the cross-region bandwidth is severely restricted), it predictably lowers the WAN bandwidth requirements. As far as maximum performance, the paper mentions that the limiting factor is handling client requests and not replication, and this is why one fewer server pulling from the leader does not impact throughput. I am not sure I am very comfortable with this explanation, to be honest. 

The paper talks about a handful of other optimizations. I will mention just one that seemed the most interesting — speculative execution. With speculative execution, the nodes do not wait for a commitment notification to apply an operation, and speculatively apply it to the store right away. Since the underlying storage engine is multi-version, the system can still return strongly consistent reads by keeping track of the latest committed version. The multi-version store also allows rollbacks in case some operation fails to commit.

You can see my presentation of the paper on YouTube:

Discussion

1) Cost of pull-based replication. The performance cost of the pull-based solutions was the first big discussion topic that I raised in my presentation. As I mentioned, there is an extra communication step needed to allow the primary to learn of quorum acceptance. This step either adds additional message round/RPC call or adds latency if piggybacked to the next pull request. Another concern is pulling data when there is no new data, as this will be wasted communication cycles. 

Luckily, we had MongoDB people, including Siyuan Zhou, one of the paper authors, to shed some light on this. To make things a little better and not waste the communication cycles, the pulls have a rather long “shelf life” — if the source has no data to ship, it will hold on to the pull request for up to 5 seconds before replying with a blank batch. 

Another big optimization in the MongoDB system is a gradual shift to the push-based style! This somewhat makes the entire premise of the paper obsolete, however, this new “push-style” replication is still initiated by the syncing server with a pull, but after the initial pull, the source can push the data to the syncing server as it becomes available. So this allows building these complicated replication topologies while reducing the latency impact of a purely pull-based solution.

Another aspect of cost is the monetary cost, and this is where chained replication topologies can help a lot. Apparently, this was a big ask from clients initially and shaped a lot of the system architecture. 

2) Evolution vs Revolution. So, since the original unproven replication approach was pull-based to allow chained topologies, the new improved and model-checked solution had to evolve from the original replication. One might think that it would have been easier to slap a regular push-based Raft, but that would have been a drastic change for all other components (not to mention the features). This would have required a lot more engineering effort than trying to reuse as much of the existing code as possible. This brings an interesting point on how production software gradually evolves and almost never drastically revolves.

3) Evaluation. The evaluation is the biggest problem of the paper. It lacks any comparison with other replication approaches except old MongoDB’s primary backup scheme. This makes it hard to judge the impacts of the changes. Of course, as we have seen from the discussion with the authors, the actual implementation is even more complicated and evolved. It tries to bridge the gap between pull and push replication styles, so a clear comparison based on MongoDB’s implementation may not have been possible at all. 

That being said, I had some questions even about the provided self-comparisons. See, I would have expected to see a bit of throughput increase from chaining, similar to what I observe in PigPaxos, but there is none. The paper explains it by saying that replication at the sync source takes only 5% of a single core per syncing server, which would amount to just 20% of a core in star topology leader. Roughly, given the VMs used, this is around 5% of the total CPU used on replication, with the paper claiming that all remaining CPU is used to handle client requests. Assuming there is sync every 2 ms, we have about 2000 syncs per second at the leader for 12000 client requests. Doing some napkin math, we can see that there are 6 times more requests than replications per second, yet requests use 19 times the CPU, making each request roughly 3 times more expensive than each replication. Given that replication messages are not super cheap and require querying the log, and serializing the data, the performance cost difference sounds excessive to me, despite considering that client requests have a few writes in them (i.e., write to the log, and operation execution).

Siyuan explained that there is a bit more work on the request path as well. Not only the log is written (and persisted for durability), but there are also some additional writes for session maintenance and indexes. Moreover, the writes in the underlying WiredTiger engine are transactional, costing some performance as well. 

4) PigPaxos? Throwing this out here, but there are push-based solutions that can have more interesting replication topologies than a star. Our PigPaxos, in fact, solves a similar issue of cross-regional replication at a low network cost. 

5) Other pull-based systems. Finally, we try to look at other solutions that may use pull-based replication, but there are not many. Pub-Sub systems fit the pull model naturally as subscribers consume the data by pulling it from the queue/log/topic. Pull-based replication can be handy in disaster recovery when nodes try to catch up to the current state and must ask for updates/changes starting some cursor or point in history. 

6) Reliability/Safety. As the protocol makes one important change of not rejecting the data from the old-termed source, it is important to look at the safety of this change. The paper claims to model the protocol with TLA+ and model checking it. Intuitively, however, we know that even though the node takes the data from the old source, it actively rejects it by sending its higher term. This, intuitively, should be enough to ensure that the old leader does not reach a quorum of accepts (even though there can be a majority of nodes that copied the data) and does not reply to a client of commitment. The rest is taken care of by Raft’s commit procedure upon the term change.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo

In the 68th reading group session, we discussed scheduling in dataflow-like systems with Cameo. The paper, titled “Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo,” appeared at NSDI’21.

This paper discusses some scheduling issues in data processing pipelines. When a system answers a query, it breaks the query into several steps or operators, such as groupBys, aggregations, and various other processing steps. Each of these operators executes on some underlying serverless actor (in this work, the underlying platform is Orleans), so naturally, the operators must be scheduled to run on some actors when the actors become available. 

Scheduling is easy when we always have idle actors/workers that can pick up news tasks. But this can lead to overprovisioning of resources and becomes expensive for all but the most latency-critical applications. As such, resource-shared environments that allow processing multiple queries from different tenants are more cost-efficient and practical. Of course, the problem with shared environments is keeping all tenants happy and isolate them from each other. This means that scheduling needs to be aware of SLAs a system must provide to tenants and try not to blow past the deadline. Cameo is a dataflow processing system that aims for fine-grained scheduling of tasks in the system to make sure all queries, consisting of many tasks, finish within some deadline or SLA guarantee. 

Keeping a deadline is tricky though. One complication arises from queries that require more than one operation to compute. Some of these query operations may be dependant on each other and need to happen in sequence. As a result, the query deadline must be broken down into sub-deadlines for all the sequential execution steps. Instead of enforcing the execution deadlines, Came enforces a start deadline for each task. This deadline is the latest time a task must begin to compute, such that it will not impact the downstream tasks and overall query deadline. 

Things can get more complicated from here. If one step takes longer to compute than anticipated, then the start deadlines of downstream tasks/steps may be violated. The system, however, tries to adjust/reprioritize the consecutive steps to accelerate their execution and hopefully still meet the overall query deadline/SLA. But there is more. See, if the scheduler made a mistake once and misjudged the execution time of a task, this can happen for the same query again. To counter this problem, Cameo propagates the task execution information to the scheduler so that it can take better actions next time. This feedback mechanism allows the scheduler to learn from past executions of a query/task and improve in the future.

Now about the scheduler. It has two major components – the scheduling mechanism and the scheduling policy. The scheduling mechanism executes the scheduling policies, and the two parts are rather separate. Such separation allows the scheduling mechanism to take tasks with high priority (sooner start deadline) regardless of how this priority was computed by the policy. The scheduling policy component is pluggable and can be changed for different applications, as long as it creates the priorities that the execution component understands. This part must follow Cameo API to allow rescheduling of downstream/consecutive tasks and learning the stats of step execution to adjust in the future.

The authors claim significant latency improvements of up to 4.6 times when using Cameo. 

As always, we had a presentation in our reading group. The video presentation by Ajit Yagaty is available on YouTube:

Discussion

1) Spark Streaming. The scenarios presented in the paper operate on dynamic data sets that continuously ingest new data. This is similar to other streaming dataflow solutions, such as Spark Streaming, for example. The paper does not provide a direct comparison in evaluation but claims that Cameo has a much more fine-grained scheduling at the per-task level.

2) Fault Tolerance. One natural question in the reading group was around fault tolerance. The paper does not touch on this topic at all. Since the system is built using the Orleans framework, we suspect that most fault tolerance is taken care of by the framework. 

3) Spiky Workloads. The feedback mechanism used to learn how long tasks may take and adjust the scheduling accordingly works great in a streaming system when we have a steady inflow of data and the same queries that process this new data. Adding a new query will likely create some problems (at least for that query) until the system learns the stats of all the tasks this query uses. Similarly, having a big spike in data ingestion can cause some mayhem. If a query operates on the premise that some task needs to process events/messages, but receives 10n, its execution time may become too large and cause the downstream deadlines to violate. And such momentary spikes in a steady flow of new data may happen rather often — all that is needed is some transient network glitch that delayed some messages causing a spikey delivery moments later. 

4) Query Optimizers? We are not query optimization experts here in the reding group, but we wonder if using some query optimization techniques can help here. So instead of learning on historical stats to predict how long different steps of a query may take, can we be smarter and optimize based on the actual data at hand? A simple and naive thing to do for a previous example in (3) is to account for data cardinalities when computing the start deadlines so that a spike in data ingestion can be noted and dealt with immediately. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!