All posts by alekseyc

Reading Group Special Session: Building Distributed Systems With Stateright

This talk is part of the Distributed Systems Reading Group.

Stateright is a software framework for analyzing and systematically verifying distributed systems. Its name refers to its goal of verifying that a system’s collective state always satisfies a correctness specification, such as “operations invoked against the system are always linearizable.” Cloud service providers like AWS and Azure leverage verification software such as the TLA+ model checker to achieve the same goal, but whereas that solution can verify a high level system design, Stateright is able to systematically verify the underlying system implementation in addition to the design.

In this talk, Stateright’s author Jonathan Nadal will introduce the project, demo its usage, and discuss upcoming improvements. Jonathan has been in the cloud space since 2012 when he joined Amazon Web Services to work on S3. There he met Chris Newcombe, who introduced TLA+ to the company, and was immediately hooked by that technology. Jonathan later joined Oracle to help build Oracle Cloud Infrastructure, where he is currently a director of engineering and a proponent of model checking techniques within and outside his own teams. Stateright is a personal project not affiliated with Oracle or Amazon.

Video Recording

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads

On Wednesday we were discussing scheduling in large distributed ML/AI systems. Our main paper was the “Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads.” one from OSDI’20. However, it was a bit outside of our group’s comfort zone (outside of my comfort zone for sure). Luckily we had an extensive presentation with a complete background overview of prior ML/AI scheduling works.

Scheduling problems are not new by any means. We have schedulers at the OS level to allocate CPU resources, we have more sophisticated schedulers for large clusters, HPC systems, etc. So scheduling for machine learning should not pose a big problem, given all the prior scheduling work in the literature and practice. However, this is not necessarily the case, since scheduling for ML/AI systems introduces a few additional challenges. One issue is unpredictable completion time. For instance, many similar jobs may be started with different hyperparameters, and get killed off as they progress, making it harder to have a fair scheduler. Another issue is the non-uniformity of access to resources — information exchange between GPUs is faster within the confines of a single machine. When a job is scheduled on resources (GPUs/FPGAs, etc) spread across many servers, the network latency/bandwidth may get in a way. And the difference in how fast or slow the data can be exchanged between the devices grows with the degree of network separation. I find this problem similar to NUMA.

In the context of today’s paper, the additional challenges are the heterogenicity of resources in the cluster. This is something more traditional OS schedulers started to consider only relatively recently on ARM with big.LITTLE CPU architectures (and will consider soon on x86). Here, however, we have more than just a few resource variations. The paper considers different types of GPUs, but they also mention other special-purpose hardware, like FPGAs. To make things even more complicated, faster hardware provides a non-uniform speedup for different types of ML tasks — one application may have a 3x speedup from a switch to a faster GPU, while some other task may have a ten-fold speedup from the same switch. Finally, different users of the same system may even have different scheduling goals or objectives, as for some (researchers?) the completion time may be more important, while for others (production users in the cloud?) the fairness may play a bigger role.

Gavel separates the scheduling into a set of distinct components or steps. The scheduling policy takes into account the scheduling objective, i.e. the fairness or completion time. Gavel implements various scheduling policies, such as LAS, FIFO, Shortest Job First, and hierarchical composition of these policies. The scheduling mechanism executes the policy on the cluster. The throughput estimator provides the estimation of the speed of different tasks, to be fed into the policy engine if the performance estimation is not provided by the user. This is an important piece since the performance impact of different hardware varies for different training tasks!

The scheduling itself is iterative. The policy engine provides the resource allocation for a task. For example, it may say that a task needs to run 60% of the type on GPU A and 40% of the time on GPU B. This of course is computed given the user objectives and other competing tasks. The scheduling mechanism takes the allocation and tries to enforce it in an iterative manner. I will leave the detailed discussion of scheduling to the paper. The intuition, however, is rather simple. Gavel keeps track of how many iterations have been done on each resource (i.e. GPU) type, compares it with the scheduling plan, and computes the priority score for the next iteration, potentially reshuffling the tasks. So for example, if a scheduling plan calls for 60% on GPU A, but so far the task used GPU A in only 20% of scheduling iterations, then the GPU A will have a higher priority for being scheduled on the next iteration/round.

The paper evaluated the scheduling of training tasks both on a real cluster and with extensive simulations.


1) Type of jobs. The paper describes the scheduling of training jobs, but there are other types of jobs. For example, there are interactive jobs running in things like Jupiter notebooks. This type of job has to be scheduled all the time, and cannot be completely preempted pr paused due to its interactive nature. Also, trained models are only good when they are used for something. Inference jobs may have different requirements than training jobs. For example, for a production inference task, it may be more important to schedule it on multiple different machines for fault tolerance, an opposite approach to the training jobs when scheduling on as few machines as possible improves the performance through minimizing the network bottlenecks. Taking into account these other types of tasks running in the same cluster may complicated scheduling further.

2) Task movement. We talked about the performance penalty of scheduling on many different machines due to the network latency and bandwidth. However, as tasks get scheduled against different resources, they have to migrate over the network, so a very frequent change of resources may not be very good for the performance.
The task movement itself is a tricky problem. At least it appears so to an uninitiated person like me. Do we move a task at some snapshot point? How to insert that snapshot if the task itself does not make it? Can we move the task just by dumping the process and (GPU) memory on one server and restoring it on another? Does this even work across different GPU designs/generations?

3) Fault tolerance. What is the granularity of fault tolerance in large cloud ML/AI systems? Obviously, if the process crashes for some reason, we want to restart at some previous checkpoint/snapshot. But whose responsibility is it to make these snapshots? The snapshots may be costly to make too frequently, so users may be reluctant to make snapshot their progress too often. But since the snapshots may be needed for task movement, can we take advantage of that for fault tolerance? Again, the uninitiated minds are speaking here, so likely these are all solved questions.

4) Planetary-scale scheduling. We want to avoid (frequent) communication between GPUs over the network because of limited bandwidth. On a geo-scale, the bandwidth may become even more limited, and the “first-byte” latency even larger, further impacting the performance. So it is likely not a good idea to schedule a task across WAN (or maybe even across availability zones in the same region). However, moving the tasks on a planetary scale between regions may be something to consider, if this is done infrequently, and remote regions have more of the desired resource available. This requires longer-term planning though, as we would like to make sure that an expensive move can be offset by the task using as much of that resources as possible before being moved again.

5) User performance estimates. Gavel has a performance estimation component, but it can take performance estimates from the users. It would be interesting to see if these user-provided estimates are correct in real world. Also, can Gavel adjust these user estimates based on its own measurements?

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. FlightTracker: Consistency across Read-Optimized Online Stores at Facebook

Last DistSys Reading Group we have discussed “FlightTracker: Consistency across Read-Optimized Online Stores at Facebook.” This paper is about consistency in Facebook’s TAO caching stack. TAO is a large social graph storage system composed of many caches, indexes, and persistent storage backends. The sheer size of Facebook and TAO makes it difficult to enforce meaningful consistency guarantees, and TAO essentially operates as an eventual system, despite having some stronger-consistency components in it. However, a purely eventual system is very unpredictable and hard to program for, so TAO initially settled for providing a read-your-write (RYW) consistency property. The current way of enforcing RYW is FlightTracker. FlightTracker is a recency token (Facebook calls these tokens Tickets) system running in every Facebook datacenter. Tickets keep track of recent writes performed by a datacenter-sticky user session. In a sense, the ticket is a set of tuples <Key, Key-Progress>, where the Key-Progress is some value to designate the recency of the write on the Key, like a version, timestamp, or a partition sequence number. The reads then include the ticket and propagate it across the stack to the nodes that serve the requests. With the recency information in the ticket, a server can make a local decision on whether it is sufficiently up-to-date. If the node is stale, it can forward the request to a higher-level cache or durable store to retrieve the data.

Many other systems use recency tokens, but they usually do not explicitly specify all writes done by the user. For example, a token may be a single number representing the last transaction id seen by the client. This is good for making sure the recency tokens are small, but it has a smaller resolution — such token will enforce a per partition recency instead of per key, and cause too many caches misses when the per-key RYW guarantees are needed. 

However, keeping and transferring the explicit set of all client’s write is expensive, so FlightTracker uses a few compaction strategies. For one, it is only sufficient to keep track of the most recent write on the key. Secondly, in some workloads with a larger number of writes, FlightTracker may reduce the resolution and stop tracking individual writes and, for example, switch to a partition-level tracking by transaction id or sequence number. Finally, TAO stack enforces some bounded consistency of about 60 seconds, so the writes older than 60 seconds can be purged from the ticket. 

FlightTracker stores the tickets in simple replicated systems based on a distributed hashing. Upon reads, the ticket is first fetched from the FlightTracker, and then included with all the read operations. Since one request typically makes many reads, the cost of ticket fetching is amortized over many read operations. Nevertheless, the FlightTracker is fast to fetch tickets — it takes just ~0.3 ms.  Whenever writes happen, a ticket for a particular user session is updated to include the new writes and exclude the compacted ones.

The paper has many details that I have left out of this summary and the presentation:


1) What can go wrong if RYW is broken? The paper discusses the RYW topic quite substantially. One important point here is that RYW enforcement is “relatively” cheap — it provides some useful consistency guarantee without making cache misses due to consistency (i.e. consistency misses) too frequent. So it appears like a balance between the usefulness and the cost of consistency properties at the Facebook scale. However, the paper does not talk much about what can go wrong in Facebook (or more specifically in applications that rely on the social graph) if RYW does not hold. The paper mentions that it is a reasonable default consistency for developers and users. In our discussions, we think it is more useful for the end-users. If a person posted something on the site, then refreshed the page and the post does not appear because of RYW violation, the user may get confused whether the site is broken, or whether they pressed the right button. We do not think that in most cases there will be serious consequences, but since Facebook is a user-centric application, providing intuitive behavior is very important. Actually, the paper suggests that RYW violations may still happen, by saying that the vast majority of servers (>99.99%) are within the 60 seconds staleness window. This means that in some rare cases it is possible to have a “clean” ticket after compaction and hit one of these <0.01% of servers and get stale data. 

2) Architectural style. So… this is a Facebook paper, and you can feel it. Just like many other Facebook papers, it describes the system that appears very ad-hoc to the rest of the stack. The original TAO is also a combination of ad-hoc components bolted together (does it still use MySQL in 2021?). The FlightTracker was added to TAO later as an after-thought and improvement. Not a bad improvement by any means. And having all the components appear separate and independent serves its purpose – Facebook can build a very modular software stack. So anyway, this appears like a very “engineering” solution, bolted onto another set of bolt-on components. And it servers the purpose. Having 0.3 ms (1-2 eye blinks) additional latency to retrieve a ticket and provide something useful to developers and users is not bad at all. 

Another interesting point from this discussion is that Facebook is actually very conservative in its systems. Still, using PHP/Hack? MySQL? They create systems to last and then bolt on more and more components to improve. I guess at some critical mass all the bolted-on parts may start to fall off, but that only means it is time to rethink that system with something groundbreaking and new. So what about “Move fast and break things?” Does it contradict some of that conservativism? Or does it augment it? I think that latter — if something needs to change/improve, then just add another part somewhere to make this happen. 

3) Stronger consistency than RYW? The paper says that FlightTracker can be used to improve the consistency beyond RYW. The authors provide a few examples of systems manipulating the tokens to get more benefits — indexes and pub-sub systems. With indexes, they actually bolt-on yet another component to make them work. 

4) Cost of tickets. Since each ticket represents a set of recent writes, its cost is not static and depends on the number of writes done by the user session in the past 60 seconds. The main reason the cost of storing and transferring tickets does not explode is the 60-second global compaction, allowing to keep an average ticket size at 250 bytes. The median ticket size is 0 bytes, meaning that a lot of requests happen 60 seconds after users the last write. However, we do not think that a system like this will scale to a more write-heavy workload. TAO’s workload (at least in 2013 when the original paper came out) is 99.8% reads, so write are rare. With more writes, a constant-size ticket may start to make more sense, and we have a feeling that the cross-scope compaction when writes on a ticket are replaced with a more comprehensive/encompassing progress marker. 

5) Cross-datacenter issues. One of the reasons for implementing FlightTracker was the fault tolerance, as the prior RYW approach that relied on write-through caches could not handle some failures that require changes in the write’s route through the caches to the storage. With FlightTracker, any TAO datacenter/cluster can serve reads while enforcing the RYW. This enables, in some rare cases, to even do reads from another datacenter if the local cluster is not available. However, it appears that the users are still sticky to the datacenter, as FlightTrakcer service lives independently on each datacenter. This means that a user’s request must come to the datacenter, retrieve the ticket, and only then it can cross the datacenter boundaries if there is a big outage locally. If the outage is so severe that the user cannot even reach its datacenter, then its request won’t get the ticket and may actually experience an RYW violation. Another nice Facebook paper talks in more detail about what happens to user requests before they reach datacenters. 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group Paper List. Papers ##51-60.

With just four more papers to go in the DistSys Reading Group’s current batch, it is time to get the next set going. This round, we will have 10 papers that should last till the end of the spring semester. Our last batch was all about OSDI’20 papers, and this time around we will mix things around both in terms of the venues and paper recency. We will also start the batch with one foundational paper taken from Murat’s recent list of must-read classical papers in distributed systems. Without further ado, here is the list:

  1. Distributed Snapshots: Determining Global States of a Distributed System – April 7th
  2. Facebook’s Tectonic Filesystem: Efficiency from Exascale – April 14th
  3. New Directions in Cloud Programming – April 21st
  4. Paxos vs Raft: Have we reached consensus on distributed consensus? – April 28th
  5. Protocol-Aware Recovery for Consensus-Based Storage – May 5th
  6. chainifyDB: How to get rid of your Blockchain and use your DBMS instead – May 12th
  7. XFT: practical fault tolerance beyond crashes – May 19
  8. Cerebro: A Layered Data Platform for Scalable Deep Learning – May 26th
  9. Multitenancy for Fast and Programmable Networks in the Cloud – June 2nd
  10. Exploiting Symbolic Execution to Accelerate Deterministic Databases – June 9th

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories

Hard to imagine, but the reading group just completed the 45th session. We discussed “Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories,” again from OSDI’20. Pegasus is one of these systems that are very obvious in the hindsight. However, this “obviousness” is deceptive — Dan Ports, one of the authors behind the paper who joined the discussion, mentioned that the project started in 2017, so it was quite a bit of time from the start to publish with a lot of things considered and tried before zeroing in on what is in the paper. 

Pegasus is a replication system for load balancing and throughput scalability in heavily skewed workloads. Consider a workload with a handful of “hot” objects. These hot objects may have so much usage, that they overwhelm their respective servers. Naturally, this limits the overall throughput because the system is now capped by servers at their maximum capacity/utilization. The solution is to replicate these hot objects to many servers and allow clients to access them from multiple replicas. However, as soon as we have a replicated scenario, we start running into consistency issues. Strongly consistent systems often degrade in performance with the addition of more replicas due to the synchronization overheads. This is what makes Pegasus rather unique — it scales for load balancing through replication while remaining strongly consistent. The key enabler of this is the smart Top of Rack (ToR) switch that handles all the traffic in the server rack. This switch acts as the “source of synchrony” in the rack, and it does so at the packet’s line speed. 

In Pegasus, the data is assigned to servers in the rack using a consistent hashing mechanism, allowing clients to send the requests directly to servers that own the data. However, all these requests go through the ToR switch which can inspect the packets and make some additional routing decisions for “hot” objects. Let’s consider a write request for a such high-demand object. ToR inspects a packet, and if it is for a “hot” key, it sends the write message to some larger and potentially different set of servers in a rack, essentially increasing the replication factor and rotating the responsible servers. Once the servers ack the write completion, the ToR switch sees the acks and records these servers into its coherency directory as servers with the latest copy of the data. The read requests have a similar rerouting fate — if a read is for a hot object, instead of going to the default server, the ToR switch sends it to one of the replicas from its coherency directory. The paper has more details on implementing this coherency directory and keeping track of the recent progress using a simple versioning mechanism.

The end result is awesome! Just by replicating a handful of objects in skewed workloads (~16 objects out of a million in the paper), Pegasus achieves load balancing and high throughput beating in-network caching in almost all scenarios. There are a few other advantages to Pegasus that are missing in other SOTA solutions: the ability to store larger objects (not evaluated), and tolerance of workloads with different read-write ratios (evaluated extensively).

Finally, I have not touched on a few other important pieces of the system: figuring out which keys are hot and fault-tolerance. For measuring the key temperature, the Pegasus statistics engine samples some packets and determines the frequency of keys in the samples to make gauge how hot each key is. For fault-tolerance, the system uses chain replication across racks for durability.

As always, we have our presentation of the paper by A. Jesse Jiryu Davis:


This time around we had Dan Ports join us to answer the questions about the paper, so this turned out to be a nice discussion despite a slightly lower than expected attendance. 

1) Simple API. Currently, Pegasus supports a simple API with reads and simple destructive writes (i.e. each write is an unconditional overwrite of the previous copy of the data). The main reason for this is how the writes are structured in Pegasus. The system is very nimble and quickly adjustable, it picks write servers on the fly as the write request “goes through” the switch. This means that a write should be able to just complete on the new server. However, if the write operation is, for example, a conditional write or an update, then such an update also needs to have the previous version of the object, which may be missing on the new server. We have spent some time discussing workarounds for this, and they surely seem possible. But the solution also introduces additional communication, which both introduces more load to the servers and more latency for operations. And since we are dealing with a subset of objects that already generate the most load in the system, adding anything more to it must be avoided as much as possible. The cost of supporting these more complex API will also differ for various read-write ratios.

2) Comparison with caching. Another big discussion was around using caching to load balance the system. As Dan pointed out, caches are good when they are faster than storage, but for super-fast in-memory storage, it is hard to make a cache faster. NetCache (one of the systems used for comparison in the paper) does provide a faster cache by placing it in the network switch. It has several downsides: handles only small objects, consumes significant switch resources, and does not work well for write workloads (this is a read-through cache, I think). Of course, it is possible to make it a write-through cache as well to optimize for write workloads, but it still does not solve other deficiencies and adds even more complexity to the system. We also touched on the more complicated fault-tolerance of cached systems. The disparity between the load that cache and underlying systems can take can create situations when the underlying systems get overrun upon cache failure or excessive cache misses. 

3) Chain replication. Since Pegasus replicates for scalability, it needs a separate mechanism to handle fault-tolerance. The paper suggests using a chain replication approach, where racks are the chain nodes. Only the tail rack serves reads, however, the writes must be applied in all racks as the write operation propagates through the chain. One question we had is why not use something like CRAQ to allow other racks to serve reads, but the reality is that this is simply not needed. The chances that an object can become so skewed and hot that it needs more than a rack worth of servers are very slim, so there is no need to complicate the protocol. Another question I have now but forgot to ask during the discussion is what happens to hot writes as they go through the chain? If non-tail racks only use the default server for writes on “hot” keys (instead of let’s say round-robin or random node), then this server may get overwhelmed. But I think it is trivial to pick a random server for the hot object on each write at the non-tail racks. 

4) Zipfian distribution and workload skewness. Pegasus needs to load-balance fewer keys for a less skewed Zipfian distribution. This was an interesting and a bit counter-intuitive observation at the first glance since one can intuitively expect the more skewed distribution to require more load-balancing. However, a higher alpha Zipf has more skewed objects, but not necessarily more skewed objects than a lower alpha Zipfian distribution. Fewer highly skewed objects mean less load-balancing. 

5) Virtualization of top-of-rack switches. One important question about the technology that enables Pegasus is the virtualization of these smart ToR switches. Currently, it is not the case, so one needs to have bare-metal access to a switch to deploy the code. Virtualization of such a switch may make the technology available to cloud users. I think this would be a huge boost to the overall state of distributed computing at the datacenter level. I would be speculating here, but I think a lot depends on the willingness of manufacturers to provide such support, especially given the relatively limited capabilities of the hardware right now. Of course, the virtualization should not add a significant latency penalty to the users, and most importantly should not add any penalty to non-users (applications/systems that reside in the same rack but do not use the extended capabilities of the switches). Couple all these with the risks of running user’s code on the hardware that handles all the traffic in the rack, and we also need to worry about user isolation/security more than ever. However, as wishful as it is, it is quite probable that these smart switches will not make their way to the public cloud any time soon. This gives large cloud vendors an edge since they can benefit from the technology in their internal systems. Smaller service providers that rely on the cloud, however, will have to find a way to compete without access to this state-of-the-art technology.  Aside from my extremely high-level speculations, some smart people actually go deeper into the topic.

6) There were a few other minor topics discussed, and jokes are thrown here and there. For example, Dan explains Pegasus with cat pictures

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Performance-Optimal Read-Only Transactions

Last meeting we looked at “Performance-Optimal Read-Only Transactions” from OSDI’20. This paper covers important topics of transactional reads in database/data-management systems. In particular, the paper discusses “one-shot” read-only transactions that complete in 1 network round-trip-time (RTT) without blocking and bloated and expensive messages. If this sounds too good to be true, it is. Before presenting these types of transactions, the authors discuss why it is impossible to have Non-blocking, One round-trip, Constant size messaging, Strictly Serializable (NOCS) read-only transactions. This becomes a “pick 3 out of 4” kind of deal.

The Performance-Optimal Read-Only Transaction (PORT) system shows how to get away with NOC and try to get as close as possible to S. For One round-trip constraint, the paper makes the clients coordinate their own read-transactions and control the ordering of reads, all in one round of message exchange. This requires the clients to send some recency/progress metadata over to servers. In the case of PORT, the metadata is a Version Clock, a type of logical clock. It is just a number, so it is Constant-size. Finally, the servers can use metadata to return the latest value that satisfies the recency constraint imposed by the Version Clock in a Non-blocking manner. The servers also avoid coordination to again satisfy the One round-trip requirement. To make sure the reads do not block, PORT never considers the in-progress operations. PORT separates the in-progress operations from the completed ones with a stable frontier time/version. Clients must request reads at what they know to be the latest stable, immutable state of the system and never try to request the state from the in-progress operations. Since different clients may have different and stale knowledge of the stable frontier, the system needs to support reading different versions of data, hence PORT relies on a multi-version store. 

PORT also does some clever trickery to improve the consistency. For example, a promotion mechanism is used to block-out a range of versions for writing in some cases. If some data was written with a version v=10 and then a read transaction has requested a value at version v=15,  the v=10 value will be promoted to occupy the entire range of versions [10, 15], and servers will be disallowed to write anything in that range. This, however, does not cause the write in that version range to abort, and instead, it will be written at version v=16. 

The paper implements PORT in ScyllaDB and Eiger and shows nearly identical throughput in read-heavy workloads to that of non-transactional reads while also beating Eiger’s transactions. There are quite a few important details and nuances on implementing PORT. The implementation on top of Eiger is full of surprises as the promotion mechanism described above no longer works for transactional writes, and PORT uses another clever trick. 

The presentation by Alex Miller that goes into a bit more details than my summary:


1) SNOW theorem. NOCS theorem the authors discuss sounds similar to the SNOW. Well, it is by the same first author, so this makes some sense. Both are about read-only transactions, both concern the trade-offs between performance and latency. NOCS focuses on performance-optimal transactions, while SNOW talks about latency-optimal. Both talk about the impossibility of having the highest consistency and the be “x-optimal” (fill in the “x”). Moreover, the NOC (non-blocking, one round trip, constant metadata) implies that performance here largely means latency as well. It jsut happens that if we stop doing all the extra work, then the throughput improves as well. In a sense, it appears that NOCS is a rebranding of SNOW to some extent. We can even map letters in both abbreviations to similar concepts. S = (strict) serializability in both cases. N = Non-blocking. O = one round trip (in SNOW it is coordinate/retry, which is pretty much whether we add more messages or not). So three letters are the same, W & C are the difference, but even there we can find some similarities. W in SNOW stands for write conflict avoidance, and one way to do so may require violating C in constant metadata. The paper itself mentions that NOCS is similar to SNOW.

2) Other causal systems. Occult and PaRiS were brought up to the discussion briefly. We have not spent too much time on this though. Occult is a causal system that avoids “slowdown cascades” due to dependencies and the need to enforce causality. PORT with its one-RTT non-blocking mechanism seems to be similar in this regard, so a comparison would be interesting. 

3) HLC for the logical clock? HLCs are used in the transactions in MongoDB and Cockroach. HLCs are logical clocks, constant in size, and do help identify consistent cuts/snapshots for transactions. MongoDB uses HLCs for cross-partition causal transactions, and it seems to fit well within the NOC. CockroachDB is more involved, but it also uses HLC. Another important part of HLC is that it can provide a single serial order, but this is something PORT actually avoids in Eiger-PORT since it needs to provide a different serial order to different clients to enforce read-your-write property without blocking.

4) On the importance of a stable frontier. A stable frontier is the time in the system’s execution that separates what is safe to read and what is not. Everything before the stable frontier is committed/executed and safe, any operation after the frontier may not have been fully written/committed yet, and is not safe. This separation is clear in Scylla-PORT, but gets blurred in Eiger-PORT and its read-your-write reordering. 

5) Replication. The paper does not address replication issues at all, so one has to wonder about how it handles replication and associated failures. For example, in Cassandra/Scylla, a read succeeds after being completed by some read-quorum that may be smaller than all replicas for the object. This means that you can promote the value on a subset of replicas, and then do a write on some quorum containing the un-promoted replicas and end up with the same write recorded under different versions on different replicas. This may or may not be a huge problem, but a conversation on replication/failures would be very useful. The code (which is open source) may help to shed the light on this, but we have not had a chance to look at it during the discussion.

6) Eiger-PORT. This one is very different from the ScyllaDB version. It is different from how the paper described PORT all along since Eiger-PORT cannot promote the operations because now writes are in a transaction, and all writes from one transaction must be promoted to a higher version atomically. If you do that, you need to coordinate between servers and add messages and lose the O part of NOC. Authors go into more details describing the Eiger-PORT protocol, which is not the easiest thing to grasp from the first read. It is also mind-twisting when you start reordering operations for different clients. Actually, as of the time of this writing, we were still discussing some aspects of Eiger-PORT in our group’s slack channel.

7) Evaluation. We liked the choice and rationale for picking the baseline systems to evaluate against. PORT indeed showed to have low overhead in ScyllaDB while improving the database’s consistency semantics.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Microsecond Consensus for Microsecond Applications

Our 43rd reading group paper was about an extremely low-latency consensus using RDMA: “Microsecond Consensus for Microsecond Applications.” The motivation is pretty compelling — if you have a fast application, then you need fast replication to make your app reliable without holding it back. How fast are we talking here? Authors go for ~1 microsecond with their consensus system called Mu. That is one-thousandth of a millisecond. Of course, this is not achievable over a regular network and network protocols like TCP, so Mu relies on RDMA.

In my mind, Mu maps rather perfectly to Paxos/MultiPaxos, adjusted for the RDMA usage. Accept phase is pretty much Paxos phase-2. The leader directly writes to the follower’s memory. Mu does not use protocol-specific acks, but there is still an RDMA-level ack for successfully writing memory and thus completion of phase-2. Of course in Paxos, followers must check the ballot before accepting an operation in Phase-2. This requires processing and will negate the benefits of direct memory access. To work around the problem, Mu uses RDMA permissions to control whose memory writes are accepted in phase-2. The bottom line, however, is that we have a single round trip phase-2 capable of rejecting messaging from “wrong” leaders, just like in Paxos.

Paxos elects a leader in phase-1. In Mu, the equivalent of phase-1 consists of 2 sub-phases. First, a prospective leader contacts the quorum of followers and tells them to change the permissions from an old leader to itself. This prevents the old leader from writing to a quorum and makes it stop. This quorum becomes “the leader’s go-to quorum”, as it can only write to the nodes from that quorum due to permissions. In the second sub-phase, the prospective leader learns of the past proposal/ballot number and any past operations to recover. The leader then picks a higher proposal number and writes it back. Just like in Paxos/MultiPaxos, the leader must recover the learned commands.

Another prominent part of the paper is the failure detector. The authors claim that it allows for fast leader failover. The detector operates by a pull mechanism — a leader maintains a heartbeat counter in its memory, and increments it periodically, the followers read the counter and depending on the counter’s progress adjust the “badness” score. If the counter moves too slow or does not move (or not readable at all?), the badness score becomes high, causing the follower to decide that a leader has failed and try to take over.

As always, the paper has way more details than I can cover in a short summary. Our group’s presentation by Mohit Garg is available on YouTube:


1) Performance. Microsecond latency covers only replication and does not include any of the client interactions or request capture. These components may add a significant delay to the client-observed latency. Moreover, the throughput figure has latency that is at least somewhat close to 1 microsecond only at the low-end of the throughput curve. Pushing more operations degrades latency quite significantly — up to 15 microseconds. Of course, it is worth noting that this is with batching enabled, so still pretty impressive.

2) Use of RDMA permissions for leader enforcement. This looked familiar to me… Until I was reminded that in the 17th reading group meeting we looked at the “Impact of RDMA on agreement” paper by the same authors.

3) Quorums. Since the protocol relies on the permissions to be explicitly granted to a leader when it contacts a quorum, that leader cannot use any other quorum, as it won’t have permissions to access it. We were not very sure why a leader cannot contact all nodes and try to get permissions to all of them. It still needs only the majority to succeed, but having more than the quorum of nodes who can accept writes from leader may be handy, since trying to write to more nodes than the minimal quorum can be useful for controlling the tail latency and tolerating strugglers.

4) Flexible Quorums. This continues the above point about quorums. Flexible quorums are quite useful in trading off fault tolerance and scalability. Since Mu is restricted to just one quorum that granted the write permissions, it cannot take advantage of flexible quorums, such as grids.

5) Failure detector. Failure detector is one of the most interesting and controversial features in Mu. We have spent quite a bit of time discussing it. First of all, what does the pull model give us? Every follower keeps pegging the leader and reading some counter. But what if the leader is actually totally and utterly down, how can you read the memory of the crashed server to learn its counter and compute the badness score from it? Of course, if a follower cannot read, then it can conclude that the leader is down and start the leader election, but this is not explicitly mentioned in the paper. So what is the purpose of reading a counter and having the counter increase then? Being able to read the counter clearly means the leader is up, at least in some capacity. The counter and badness score computed from it is not so much the proxy of the node’s overall up/down status, but the proxy of the node’s health/performance. The paper briefly alludes to this when talking about replication being stuck, eventually causing the heartbeat counter to stop as well and trigger an election, despite the leader not being completely down.

In the discussion, we came up with a different heartbeat mechanism, that avoids the “read from dead node” issue. If we make the leader write its counter to the followers’ memory, and followers read their local copy of the leader’s counter, then a leader crash will stop the counter progress, and followers can detect it by reading their local memory. Quite honestly, this scheme sounds cleaner to us than the follower pull/read approach used in the paper. The authors claim that the pull mechanism provides better detection latency, but this is not backed up experimentally in the paper.

6) “Dumb” acceptors. Mu is not the only protocol that assumes “dumb” Paxos acceptors/followers that simply provide a write/read interface with very little capacity to run any “logic”. Disk Paxos assumes separate sets of processors and disks. One processor can become a leader, and disks are the followers. Disk Paxos, of course, would not provide the same low latency, as in each phase a processor needs to both write and read remote disks/storage. The paper briefly mentions Disk Paxos. CPaxos is a WAN Paxos variant built using strongly consistent cloud storage services as acceptors. Similarly, the storage service provides limited ability to run any logic and the leader must jump through some hoops to maintain safety. Another one mentioned in the discussion was Zero-copy Paxos.

7) Ordered communication for correctness. We spent a bit of time talking about the importance of ordered communication (FIFO) for the correctness of the protocol. If not for FIFO, there could have been some interesting corner cases around the leader churn. I usually do not fully trust papers that just state the assumptions of the FIFO channels and move on, since traditionally you may have quite a few corner-cases with systems built on FIFO network protocols, like TCP, and have messages reordered. One common reason is that applications often have complex and multi-threaded logic, and may reorder messages internally after the messages have left the TCP stack. Here, however, there is no logic at the followers, and it makes the ordered network all you need (assuming there are no other corner-cases in the network, like dropped connections and re-connections).

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Cobra: Making Transactional Key-Value Stores Verifiably Serializable.

This Wednesday, we were talking about serializability checking of production databases. In particular, we looked at the recent OSDI’20 paper: “Cobra: Making Transactional Key-Value Stores Verifiably Serializable.” The paper explores the problem of verifying serializability in a black-box production system from a client point of view. This makes sense as serializability is an operational, client-observable property. The tool, called Cobra, collects the history via a middle layer sitting between a client and the database and then uses the history to construct a polygraph representing all possible execution orders. The problem then becomes finding whether a serial execution order exists in the polygraph. This means that we need to find some graph that has no dependency/ordering cycles. For instance, this would represent a cycle: A depends on B, B depends on C, and C depends on A. In fact, quite a few tools take a similar approach for checking sequential equivalence properties. What makes Cobra different is that they want to check serializability at scale, but the problem is NP-complete, so adding more events to the graphs makes checking exponentially slower. To mitigate the issue, Cobra uses a few tricks. It uses a few domain-specific heuristics to reduce the size of the polygraph. It also takes advantage of parallel hardware (GPUs) to speed up (by order of magnitude!) some highly-parallel polygraph-pruning tasks. And at last, Cobra uses an SMT solver to perform the final satisfiability search on the pruned polygraph. I will leave the details of all these methods to the paper.

Our video presentation by Akash Mishra is, as always, on YouTube:


This is a very nice and interesting paper that sparked some lively discussion. here I list a few of the key discussion points.

Cobra Performance

1) Performance improvement. Cobra is significantly faster than the baselines in the paper. One concern during the discussion was about the optimizations of the baselines. For example, one baseline is Cobra’s approach minus all the optimizations. This is great in showing how much improvement the core contribution of the paper brings, but not so great at comparing against other state-of-the-art solutions. For the paper’s defense, they do provide other baselines as well, all of which perform worse than Cobra minus the optimizations one. So maybe there are no other domain-specific solutions like Cobra to compare just yet.

2) Performance improvement part 2. Another performance discussion was around the use of GPU. While the authors mention a magnitude improvement when using GPU for graph pruning, it is not often clear how much of the overall improvement is due to the GPUs. Interestingly enough, the overall gains compared to the baseline are ten-fold. For this one, the paper provides a figure that breaks down the time spent in each phase/optimization of Cobra. In read-heavy workloads, polygraph pruning, which is GPU-optimized, dominates the entire computation, suggesting that a lot of the gains may come from the use of specialized hardware.

3) Better parallel hardware? Are GPUs the best hardware to accelerate pruning? Maybe some better alternative exists? FPGAs?

4) Is it fast enough? While Cobra is significantly faster than other approaches, it may still be not fast enough for use in some production workloads. While it can handle 10k transactions in ~15 seconds, real production workloads can produce more transactions in under one second. The paper claims that Cobra can sustain an average load of 2k requests per second, which is enough for many large services. 2k transactions per second is the scale of systems about 35 years ago.

5) Do we need this in production? We spent quite a bit of time discussing this. Aside from concerns in point (3) above, there may be less utility from checking a production system. Achieving serializability in happy-case is not as difficult. There are plenty of databases out there that do just that. Testing systems in production is like testing a happy-case execution most of the time, so there may be little incentive to do that, especially given the cost of Running Cobra.

Keeping the same guarantees under failures is more difficult, this is why tools like Jepsen stress test the system by introducing the faults. Our thinking was that Cobra when combining with the fault injection can be a powerful stress-test tool. And with the capacity to check larger histories, it may be useful for checking more involved scenarios and doing routine fuzz testing (maybe even check-in testing!) to try to prevent engineers from introducing bugs.

As far as continuous production use, we did not have many scenarios, aside from a service provider checking its own compliance with some consistency SLAs or a user trying to catch a service provider to get a discount (but at what cost?)

6) Limitations. Cobra works only on key-value stores and only with a subset of common operations. For instance, it does not support range operations. Checking the serializability of range commands requires knowing not only what keys exist but also what keys do not exist in the system. This is hard in a black-box production system. However, our thought was, if this is not a production system, and you start with a blank-state for testing purposes, the knowledge of what keys do not exist is there — initially, none of the keys exist. Maybe in such a scenario, Cobra can add support for ranged operations. Especially since we think it will be more useful as a very powerful testing tool (point 5 above) rather than a production monitoring tool. There may be other reasons for the lack of range operation support that we do not know or understand.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. A large scale analysis of hundreds of in-memory cache clusters at Twitter.

In the 41st distributed systems reading group meeting, we have looked at in-memory caches through the lens of yet another OSDI20 paper: “A large scale analysis of hundreds of in-memory cache clusters at Twitter.” This paper explores various cache usages at Twitter and distills the findings into a digestible set of figures. I found the paper rather educational.  It starts by describing Twitter’s cache architecture, called Twemcache. Twemcache operates as a managed service, with cache clustering starting up, scaling up/down in a semi-automatic manner. After the brief Twemcache description, the paper starts to distill the findings of the cache usage itself. A few of the more interesting findings: 

  • A sizeable number of cache cluster at Twitter are used for write-heavy workloads/applications
  • 15% of key-value pairs have values smaller than the key. Of course, this may be specific to Twitter’s keying scheme, which authors say seem to have rather large keys.
  • Objects of some sizes tend to be more popular. Side question here. Does this correlate with some application or use case? So, maybe some use cases favor certain sizes?
  • TTL usage: bounding inconsistency/staleness, implicit deletion of objects.
  • TTL usage: expirations are more efficient than evictions. This one kind of makes sense, but it shifts some of the burdens of managing objects life-cycle in the cache over to applications that set TTLs 

The paper goes more in-depth about a variety of other important topics, such as evection methods, object popularity, etc. Another important point and potentially one of the most important ones for the academic community is the dataset — the authors released their dataset, which is a super nice thing to do. 

Our presentation is available here:


This paper was educational and shed a light on cache usage at a big company, including some pretty interesting statistics. We did not have fundamental questions about the paper or its findings.

1) Dataset. One of the bigger discussion points that were mentioned multiple times is the public dataset. Having such a large dataset is very nice for many researchers in academia. Of course, the dataset applies to the cache usage, but we think it may be useful for database researchers as well. For example, taking into account all the cache misses may produce some real representation of read workloads against the storage systems. However, such access patterns may be less useful when applied to strongly consistent storage, as caches are less likely to be used there.

2) Dataset distribution. The dataset for this paper is huge (2.8 TB compressed), and distributing it is a challenge on its own, making it even more impressive that a dataset is available. 

3) Object Popularity. Authors note a few deviations from expected: (1) unpopular objects are often even more unpopular than expected, and (2) popular objects are less popular than expected. One question we had here (and in many other places) is how specific such finding is to Twitter? Maybe a better insight into the type of workloads the exhibit such deviations from expected would have helped understand this better, but it is likely that the authors simply could not disclose the details about specific workloads/tasks.

4) Write-heavy workloads. The biggest question here is what types of workloads use cache and are write-heavy. One suggestion was “counters” (i.e. likes, retweets, etc), which makes sense as it is a piece of data that may be both accessed a lot, maybe a bit stale, and gets updated relatively frequently. Counters of sorts are also used as rate limiters, according to the paper. Another idea is maybe some analytics workloads, where some weights/models are updated relatively frequently. 

5) Miss ratio. It was rather interesting to see the miss ratio figures, especially the ratio of max miss ratio to min miss ratio. Ultimately, a cache miss must be followed by a read from the underlying storage system. A high miss ratio puts more stress on the storage system. A high ratio between max and min miss ratios over the course of a week may indicate a workload that spikes in the number of cache misses on occasion. Such workload would require a more robust/overprovisioned storage to tolerate the spikes in requests to it due to cache misses.  For example, Facebook avoids sudden traffic migrations between datacenters to control cache misses and not overwhelm the underlying infrastructure.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Virtual Consensus in Delos

We are continuing through the OSDI 2020 paper list in our reading group. This time we have discussed “Virtual Consensus in Delos,” a consensus paper (Delos is yet another greek island to continue the consensus naming tradition). Delos relies on the log abstraction to keep track of all commands/operations and their order. Traditionally, some consensus protocol, like Raft or MultiPaxos, would maintain such log. Delos, however, separates the log from consensus and exposes a virtual log to the higher application levels. The virtual log is composed of many underlying log fragments or Loglets, and each Loglet is supported by some replication protocol. The cool part about Loglets is that they do not need to share the same hardware, configuration, or even the same protocol. This allows Delos to seamlessly switch from one Loglet type or configuration to another to support the ongoing workload or scale the system. Just like in many other strongly consistent systems, there is Paxos in Delos. The virtual log configuration is supported by a Paxos-backed Metastore. This makes Paxos the source of consistency in the systems while keeping it away from the “data-path.” Since Delos can switch Loglets supporting the virtual log, each individual Loglet does not need to be as fault-tolerant as MultiPaxos or Raft. The NativeLoglet implements a stripped-down replication scheme the lacks leader election, and upon the leader failure, Delos can simply switch to a different Loglet. This switching process is a bit involved, so please read the paper for the details. 

We had a very nice presentation of the paper, although it is a bit longer than our usual presentations:


After the presentation, we have spent another 40 minutes as a group discussing the paper. A few of the key discussion points:

1) Similarity with WormSpace paper. Delos appears similar to the WormSpace paper we have discussed in one of the first reading group sessions. This makes sense, as both papers include Mahesh Balakrishnan. WormSpace also proposes a virtual log kind of system, but it does not talk about changing protocols on the fly, instead, it focuses on allocating chunks of the log to different clients/applications/leaders. So in contrast to Delos that uses a Loglet for some undermined amount of time (i.e. until it fails), WormSpace allocates fixed chunks of a virtual log ahead of time. Both systems use single-shot Paxos as the source of consistency to allocate different portions of the virtual log, and both systems have similar problems to solve, like a failure of a party responsible for a virtual log segment. 

2) Use case for mixing protocols. One of the motivations/threads in the paper is switching from one Loglet implementation to another. This use case largely motivates the ability to switch protocols “on the fly”, but it is a pretty rare use case. We were brainstorming other scenarios where such an ability is useful. One of the possibilities is adjusting to the workloads better, as some protocols may handle certain workloads better than the others. A similar idea is described in this paper, although it talks about switching between leader-full and leader-less protocols on the fly to provide better performance depending on the conflict-rate. 

3) Loglet sealing. The mechanism for switching between Loglets is quite complicated. It involves a seal operation that tells a Loglet to stop accepting the operations, however, it is not very trivial in the NativeLoglet. The nuances come from the fact that an operation may still get into the log after the Loglet is sealed and before the Loglet tail is determined and written to the Metastore. The presentation describes this situation pretty nicely. The seal may be a misleading operation, as it does not fully prevent the data from being written into the log. One analogy we had is that sealing is akin to starting closing a valve on a pipe — as we are closing it, some water may still go through, and we need to make sure to collect all that water with the checkTail operation.

4) Availability delay when switching Loglets. Although not apparent from the figures/evaluation, we believe that there is a small delay in the system when it switches from one Loglet to another. This is related to point (3) above, as sealing is not an instantaneous operation, we need to use checkTail to figure out the last log position of the old LogLet to start the new one. As such, until this position is figured out and written to the Metastore, a new Loglet may not accept operations just yet (also, a new Loglet is not known until the Metastore updates anyway). On this subject, we also had a quite long discussion on trying to see if we can improve the seal-checkTail functionality to make the transition faster, but it seems that quite a bit of a challenge comes from the simplified notion of NativeLoglet and the need to seal it without writing the “seal” operation in the log itself. 


Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!