Reading Group Paper List. Papers ##51-60.

With just four more papers to go in the DistSys Reading Group’s current batch, it is time to get the next set going. This round, we will have 10 papers that should last till the end of the spring semester. Our last batch was all about OSDI’20 papers, and this time around we will mix things around both in terms of the venues and paper recency. We will also start the batch with one foundational paper taken from Murat’s recent list of must-read classical papers in distributed systems. Without further ado, here is the list:

  1. Distributed Snapshots: Determining Global States of a Distributed System – April 7th
  2. Facebook’s Tectonic Filesystem: Efficiency from Exascale – April 14th
  3. New Directions in Cloud Programming – April 21st
  4. Paxos vs Raft: Have we reached consensus on distributed consensus? – April 28th
  5. Protocol-Aware Recovery for Consensus-Based Storage – May 5th
  6. chainifyDB: How to get rid of your Blockchain and use your DBMS instead – May 12th
  7. XFT: practical fault tolerance beyond crashes – May 19
  8. Cerebro: A Layered Data Platform for Scalable Deep Learning – May 26th
  9. Multitenancy for Fast and Programmable Networks in the Cloud – June 2nd
  10. Exploiting Symbolic Execution to Accelerate Deterministic Databases – June 9th

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories

Hard to imagine, but the reading group just completed the 45th session. We discussed “Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories,” again from OSDI’20. Pegasus is one of these systems that are very obvious in the hindsight. However, this “obviousness” is deceptive — Dan Ports, one of the authors behind the paper who joined the discussion, mentioned that the project started in 2017, so it was quite a bit of time from the start to publish with a lot of things considered and tried before zeroing in on what is in the paper. 

Pegasus is a replication system for load balancing and throughput scalability in heavily skewed workloads. Consider a workload with a handful of “hot” objects. These hot objects may have so much usage, that they overwhelm their respective servers. Naturally, this limits the overall throughput because the system is now capped by servers at their maximum capacity/utilization. The solution is to replicate these hot objects to many servers and allow clients to access them from multiple replicas. However, as soon as we have a replicated scenario, we start running into consistency issues. Strongly consistent systems often degrade in performance with the addition of more replicas due to the synchronization overheads. This is what makes Pegasus rather unique — it scales for load balancing through replication while remaining strongly consistent. The key enabler of this is the smart Top of Rack (ToR) switch that handles all the traffic in the server rack. This switch acts as the “source of synchrony” in the rack, and it does so at the packet’s line speed. 

In Pegasus, the data is assigned to servers in the rack using a consistent hashing mechanism, allowing clients to send the requests directly to servers that own the data. However, all these requests go through the ToR switch which can inspect the packets and make some additional routing decisions for “hot” objects. Let’s consider a write request for a such high-demand object. ToR inspects a packet, and if it is for a “hot” key, it sends the write message to some larger and potentially different set of servers in a rack, essentially increasing the replication factor and rotating the responsible servers. Once the servers ack the write completion, the ToR switch sees the acks and records these servers into its coherency directory as servers with the latest copy of the data. The read requests have a similar rerouting fate — if a read is for a hot object, instead of going to the default server, the ToR switch sends it to one of the replicas from its coherency directory. The paper has more details on implementing this coherency directory and keeping track of the recent progress using a simple versioning mechanism.

The end result is awesome! Just by replicating a handful of objects in skewed workloads (~16 objects out of a million in the paper), Pegasus achieves load balancing and high throughput beating in-network caching in almost all scenarios. There are a few other advantages to Pegasus that are missing in other SOTA solutions: the ability to store larger objects (not evaluated), and tolerance of workloads with different read-write ratios (evaluated extensively).

Finally, I have not touched on a few other important pieces of the system: figuring out which keys are hot and fault-tolerance. For measuring the key temperature, the Pegasus statistics engine samples some packets and determines the frequency of keys in the samples to make gauge how hot each key is. For fault-tolerance, the system uses chain replication across racks for durability.

As always, we have our presentation of the paper by A. Jesse Jiryu Davis:

Discussion

This time around we had Dan Ports join us to answer the questions about the paper, so this turned out to be a nice discussion despite a slightly lower than expected attendance. 

1) Simple API. Currently, Pegasus supports a simple API with reads and simple destructive writes (i.e. each write is an unconditional overwrite of the previous copy of the data). The main reason for this is how the writes are structured in Pegasus. The system is very nimble and quickly adjustable, it picks write servers on the fly as the write request “goes through” the switch. This means that a write should be able to just complete on the new server. However, if the write operation is, for example, a conditional write or an update, then such an update also needs to have the previous version of the object, which may be missing on the new server. We have spent some time discussing workarounds for this, and they surely seem possible. But the solution also introduces additional communication, which both introduces more load to the servers and more latency for operations. And since we are dealing with a subset of objects that already generate the most load in the system, adding anything more to it must be avoided as much as possible. The cost of supporting these more complex API will also differ for various read-write ratios.

2) Comparison with caching. Another big discussion was around using caching to load balance the system. As Dan pointed out, caches are good when they are faster than storage, but for super-fast in-memory storage, it is hard to make a cache faster. NetCache (one of the systems used for comparison in the paper) does provide a faster cache by placing it in the network switch. It has several downsides: handles only small objects, consumes significant switch resources, and does not work well for write workloads (this is a read-through cache, I think). Of course, it is possible to make it a write-through cache as well to optimize for write workloads, but it still does not solve other deficiencies and adds even more complexity to the system. We also touched on the more complicated fault-tolerance of cached systems. The disparity between the load that cache and underlying systems can take can create situations when the underlying systems get overrun upon cache failure or excessive cache misses. 

3) Chain replication. Since Pegasus replicates for scalability, it needs a separate mechanism to handle fault-tolerance. The paper suggests using a chain replication approach, where racks are the chain nodes. Only the tail rack serves reads, however, the writes must be applied in all racks as the write operation propagates through the chain. One question we had is why not use something like CRAQ to allow other racks to serve reads, but the reality is that this is simply not needed. The chances that an object can become so skewed and hot that it needs more than a rack worth of servers are very slim, so there is no need to complicate the protocol. Another question I have now but forgot to ask during the discussion is what happens to hot writes as they go through the chain? If non-tail racks only use the default server for writes on “hot” keys (instead of let’s say round-robin or random node), then this server may get overwhelmed. But I think it is trivial to pick a random server for the hot object on each write at the non-tail racks. 

4) Zipfian distribution and workload skewness. Pegasus needs to load-balance fewer keys for a less skewed Zipfian distribution. This was an interesting and a bit counter-intuitive observation at the first glance since one can intuitively expect the more skewed distribution to require more load-balancing. However, a higher alpha Zipf has more skewed objects, but not necessarily more skewed objects than a lower alpha Zipfian distribution. Fewer highly skewed objects mean less load-balancing. 

5) Virtualization of top-of-rack switches. One important question about the technology that enables Pegasus is the virtualization of these smart ToR switches. Currently, it is not the case, so one needs to have bare-metal access to a switch to deploy the code. Virtualization of such a switch may make the technology available to cloud users. I think this would be a huge boost to the overall state of distributed computing at the datacenter level. I would be speculating here, but I think a lot depends on the willingness of manufacturers to provide such support, especially given the relatively limited capabilities of the hardware right now. Of course, the virtualization should not add a significant latency penalty to the users, and most importantly should not add any penalty to non-users (applications/systems that reside in the same rack but do not use the extended capabilities of the switches). Couple all these with the risks of running user’s code on the hardware that handles all the traffic in the rack, and we also need to worry about user isolation/security more than ever. However, as wishful as it is, it is quite probable that these smart switches will not make their way to the public cloud any time soon. This gives large cloud vendors an edge since they can benefit from the technology in their internal systems. Smaller service providers that rely on the cloud, however, will have to find a way to compete without access to this state-of-the-art technology.  Aside from my extremely high-level speculations, some smart people actually go deeper into the topic.

6) There were a few other minor topics discussed, and jokes are thrown here and there. For example, Dan explains Pegasus with cat pictures

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Performance-Optimal Read-Only Transactions

Last meeting we looked at “Performance-Optimal Read-Only Transactions” from OSDI’20. This paper covers important topics of transactional reads in database/data-management systems. In particular, the paper discusses “one-shot” read-only transactions that complete in 1 network round-trip-time (RTT) without blocking and bloated and expensive messages. If this sounds too good to be true, it is. Before presenting these types of transactions, the authors discuss why it is impossible to have Non-blocking, One round-trip, Constant size messaging, Strictly Serializable (NOCS) read-only transactions. This becomes a “pick 3 out of 4” kind of deal.

The Performance-Optimal Read-Only Transaction (PORT) system shows how to get away with NOC and try to get as close as possible to S. For One round-trip constraint, the paper makes the clients coordinate their own read-transactions and control the ordering of reads, all in one round of message exchange. This requires the clients to send some recency/progress metadata over to servers. In the case of PORT, the metadata is a Version Clock, a type of logical clock. It is just a number, so it is Constant-size. Finally, the servers can use metadata to return the latest value that satisfies the recency constraint imposed by the Version Clock in a Non-blocking manner. The servers also avoid coordination to again satisfy the One round-trip requirement. To make sure the reads do not block, PORT never considers the in-progress operations. PORT separates the in-progress operations from the completed ones with a stable frontier time/version. Clients must request reads at what they know to be the latest stable, immutable state of the system and never try to request the state from the in-progress operations. Since different clients may have different and stale knowledge of the stable frontier, the system needs to support reading different versions of data, hence PORT relies on a multi-version store. 

PORT also does some clever trickery to improve the consistency. For example, a promotion mechanism is used to block-out a range of versions for writing in some cases. If some data was written with a version v=10 and then a read transaction has requested a value at version v=15,  the v=10 value will be promoted to occupy the entire range of versions [10, 15], and servers will be disallowed to write anything in that range. This, however, does not cause the write in that version range to abort, and instead, it will be written at version v=16. 

The paper implements PORT in ScyllaDB and Eiger and shows nearly identical throughput in read-heavy workloads to that of non-transactional reads while also beating Eiger’s transactions. There are quite a few important details and nuances on implementing PORT. The implementation on top of Eiger is full of surprises as the promotion mechanism described above no longer works for transactional writes, and PORT uses another clever trick. 

The presentation by Alex Miller that goes into a bit more details than my summary:

Discussion

1) SNOW theorem. NOCS theorem the authors discuss sounds similar to the SNOW. Well, it is by the same first author, so this makes some sense. Both are about read-only transactions, both concern the trade-offs between performance and latency. NOCS focuses on performance-optimal transactions, while SNOW talks about latency-optimal. Both talk about the impossibility of having the highest consistency and the be “x-optimal” (fill in the “x”). Moreover, the NOC (non-blocking, one round trip, constant metadata) implies that performance here largely means latency as well. It jsut happens that if we stop doing all the extra work, then the throughput improves as well. In a sense, it appears that NOCS is a rebranding of SNOW to some extent. We can even map letters in both abbreviations to similar concepts. S = (strict) serializability in both cases. N = Non-blocking. O = one round trip (in SNOW it is coordinate/retry, which is pretty much whether we add more messages or not). So three letters are the same, W & C are the difference, but even there we can find some similarities. W in SNOW stands for write conflict avoidance, and one way to do so may require violating C in constant metadata. The paper itself mentions that NOCS is similar to SNOW.

2) Other causal systems. Occult and PaRiS were brought up to the discussion briefly. We have not spent too much time on this though. Occult is a causal system that avoids “slowdown cascades” due to dependencies and the need to enforce causality. PORT with its one-RTT non-blocking mechanism seems to be similar in this regard, so a comparison would be interesting. 

3) HLC for the logical clock? HLCs are used in the transactions in MongoDB and Cockroach. HLCs are logical clocks, constant in size, and do help identify consistent cuts/snapshots for transactions. MongoDB uses HLCs for cross-partition causal transactions, and it seems to fit well within the NOC. CockroachDB is more involved, but it also uses HLC. Another important part of HLC is that it can provide a single serial order, but this is something PORT actually avoids in Eiger-PORT since it needs to provide a different serial order to different clients to enforce read-your-write property without blocking.

4) On the importance of a stable frontier. A stable frontier is the time in the system’s execution that separates what is safe to read and what is not. Everything before the stable frontier is committed/executed and safe, any operation after the frontier may not have been fully written/committed yet, and is not safe. This separation is clear in Scylla-PORT, but gets blurred in Eiger-PORT and its read-your-write reordering. 

5) Replication. The paper does not address replication issues at all, so one has to wonder about how it handles replication and associated failures. For example, in Cassandra/Scylla, a read succeeds after being completed by some read-quorum that may be smaller than all replicas for the object. This means that you can promote the value on a subset of replicas, and then do a write on some quorum containing the un-promoted replicas and end up with the same write recorded under different versions on different replicas. This may or may not be a huge problem, but a conversation on replication/failures would be very useful. The code (which is open source) may help to shed the light on this, but we have not had a chance to look at it during the discussion.

6) Eiger-PORT. This one is very different from the ScyllaDB version. It is different from how the paper described PORT all along since Eiger-PORT cannot promote the operations because now writes are in a transaction, and all writes from one transaction must be promoted to a higher version atomically. If you do that, you need to coordinate between servers and add messages and lose the O part of NOC. Authors go into more details describing the Eiger-PORT protocol, which is not the easiest thing to grasp from the first read. It is also mind-twisting when you start reordering operations for different clients. Actually, as of the time of this writing, we were still discussing some aspects of Eiger-PORT in our group’s slack channel.

7) Evaluation. We liked the choice and rationale for picking the baseline systems to evaluate against. PORT indeed showed to have low overhead in ScyllaDB while improving the database’s consistency semantics.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Microsecond Consensus for Microsecond Applications

Our 43rd reading group paper was about an extremely low-latency consensus using RDMA: “Microsecond Consensus for Microsecond Applications.” The motivation is pretty compelling — if you have a fast application, then you need fast replication to make your app reliable without holding it back. How fast are we talking here? Authors go for ~1 microsecond with their consensus system called Mu. That is one-thousandth of a millisecond. Of course, this is not achievable over a regular network and network protocols like TCP, so Mu relies on RDMA.

In my mind, Mu maps rather perfectly to Paxos/MultiPaxos, adjusted for the RDMA usage. Accept phase is pretty much Paxos phase-2. The leader directly writes to the follower’s memory. Mu does not use protocol-specific acks, but there is still an RDMA-level ack for successfully writing memory and thus completion of phase-2. Of course in Paxos, followers must check the ballot before accepting an operation in Phase-2. This requires processing and will negate the benefits of direct memory access. To work around the problem, Mu uses RDMA permissions to control whose memory writes are accepted in phase-2. The bottom line, however, is that we have a single round trip phase-2 capable of rejecting messaging from “wrong” leaders, just like in Paxos.

Paxos elects a leader in phase-1. In Mu, the equivalent of phase-1 consists of 2 sub-phases. First, a prospective leader contacts the quorum of followers and tells them to change the permissions from an old leader to itself. This prevents the old leader from writing to a quorum and makes it stop. This quorum becomes “the leader’s go-to quorum”, as it can only write to the nodes from that quorum due to permissions. In the second sub-phase, the prospective leader learns of the past proposal/ballot number and any past operations to recover. The leader then picks a higher proposal number and writes it back. Just like in Paxos/MultiPaxos, the leader must recover the learned commands.

Another prominent part of the paper is the failure detector. The authors claim that it allows for fast leader failover. The detector operates by a pull mechanism — a leader maintains a heartbeat counter in its memory, and increments it periodically, the followers read the counter and depending on the counter’s progress adjust the “badness” score. If the counter moves too slow or does not move (or not readable at all?), the badness score becomes high, causing the follower to decide that a leader has failed and try to take over.

As always, the paper has way more details than I can cover in a short summary. Our group’s presentation by Mohit Garg is available on YouTube:

Discussion

1) Performance. Microsecond latency covers only replication and does not include any of the client interactions or request capture. These components may add a significant delay to the client-observed latency. Moreover, the throughput figure has latency that is at least somewhat close to 1 microsecond only at the low-end of the throughput curve. Pushing more operations degrades latency quite significantly — up to 15 microseconds. Of course, it is worth noting that this is with batching enabled, so still pretty impressive.

2) Use of RDMA permissions for leader enforcement. This looked familiar to me… Until I was reminded that in the 17th reading group meeting we looked at the “Impact of RDMA on agreement” paper by the same authors.

3) Quorums. Since the protocol relies on the permissions to be explicitly granted to a leader when it contacts a quorum, that leader cannot use any other quorum, as it won’t have permissions to access it. We were not very sure why a leader cannot contact all nodes and try to get permissions to all of them. It still needs only the majority to succeed, but having more than the quorum of nodes who can accept writes from leader may be handy, since trying to write to more nodes than the minimal quorum can be useful for controlling the tail latency and tolerating strugglers.

4) Flexible Quorums. This continues the above point about quorums. Flexible quorums are quite useful in trading off fault tolerance and scalability. Since Mu is restricted to just one quorum that granted the write permissions, it cannot take advantage of flexible quorums, such as grids.

5) Failure detector. Failure detector is one of the most interesting and controversial features in Mu. We have spent quite a bit of time discussing it. First of all, what does the pull model give us? Every follower keeps pegging the leader and reading some counter. But what if the leader is actually totally and utterly down, how can you read the memory of the crashed server to learn its counter and compute the badness score from it? Of course, if a follower cannot read, then it can conclude that the leader is down and start the leader election, but this is not explicitly mentioned in the paper. So what is the purpose of reading a counter and having the counter increase then? Being able to read the counter clearly means the leader is up, at least in some capacity. The counter and badness score computed from it is not so much the proxy of the node’s overall up/down status, but the proxy of the node’s health/performance. The paper briefly alludes to this when talking about replication being stuck, eventually causing the heartbeat counter to stop as well and trigger an election, despite the leader not being completely down.

In the discussion, we came up with a different heartbeat mechanism, that avoids the “read from dead node” issue. If we make the leader write its counter to the followers’ memory, and followers read their local copy of the leader’s counter, then a leader crash will stop the counter progress, and followers can detect it by reading their local memory. Quite honestly, this scheme sounds cleaner to us than the follower pull/read approach used in the paper. The authors claim that the pull mechanism provides better detection latency, but this is not backed up experimentally in the paper.

6) “Dumb” acceptors. Mu is not the only protocol that assumes “dumb” Paxos acceptors/followers that simply provide a write/read interface with very little capacity to run any “logic”. Disk Paxos assumes separate sets of processors and disks. One processor can become a leader, and disks are the followers. Disk Paxos, of course, would not provide the same low latency, as in each phase a processor needs to both write and read remote disks/storage. The paper briefly mentions Disk Paxos. CPaxos is a WAN Paxos variant built using strongly consistent cloud storage services as acceptors. Similarly, the storage service provides limited ability to run any logic and the leader must jump through some hoops to maintain safety. Another one mentioned in the discussion was Zero-copy Paxos.

7) Ordered communication for correctness. We spent a bit of time talking about the importance of ordered communication (FIFO) for the correctness of the protocol. If not for FIFO, there could have been some interesting corner cases around the leader churn. I usually do not fully trust papers that just state the assumptions of the FIFO channels and move on, since traditionally you may have quite a few corner-cases with systems built on FIFO network protocols, like TCP, and have messages reordered. One common reason is that applications often have complex and multi-threaded logic, and may reorder messages internally after the messages have left the TCP stack. Here, however, there is no logic at the followers, and it makes the ordered network all you need (assuming there are no other corner-cases in the network, like dropped connections and re-connections).

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Cobra: Making Transactional Key-Value Stores Verifiably Serializable.

This Wednesday, we were talking about serializability checking of production databases. In particular, we looked at the recent OSDI’20 paper: “Cobra: Making Transactional Key-Value Stores Verifiably Serializable.” The paper explores the problem of verifying serializability in a black-box production system from a client point of view. This makes sense as serializability is an operational, client-observable property. The tool, called Cobra, collects the history via a middle layer sitting between a client and the database and then uses the history to construct a polygraph representing all possible execution orders. The problem then becomes finding whether a serial execution order exists in the polygraph. This means that we need to find some graph that has no dependency/ordering cycles. For instance, this would represent a cycle: A depends on B, B depends on C, and C depends on A. In fact, quite a few tools take a similar approach for checking sequential equivalence properties. What makes Cobra different is that they want to check serializability at scale, but the problem is NP-complete, so adding more events to the graphs makes checking exponentially slower. To mitigate the issue, Cobra uses a few tricks. It uses a few domain-specific heuristics to reduce the size of the polygraph. It also takes advantage of parallel hardware (GPUs) to speed up (by order of magnitude!) some highly-parallel polygraph-pruning tasks. And at last, Cobra uses an SMT solver to perform the final satisfiability search on the pruned polygraph. I will leave the details of all these methods to the paper.

Our video presentation by Akash Mishra is, as always, on YouTube:

Discussion

This is a very nice and interesting paper that sparked some lively discussion. here I list a few of the key discussion points.

Cobra Performance

1) Performance improvement. Cobra is significantly faster than the baselines in the paper. One concern during the discussion was about the optimizations of the baselines. For example, one baseline is Cobra’s approach minus all the optimizations. This is great in showing how much improvement the core contribution of the paper brings, but not so great at comparing against other state-of-the-art solutions. For the paper’s defense, they do provide other baselines as well, all of which perform worse than Cobra minus the optimizations one. So maybe there are no other domain-specific solutions like Cobra to compare just yet.

2) Performance improvement part 2. Another performance discussion was around the use of GPU. While the authors mention a magnitude improvement when using GPU for graph pruning, it is not often clear how much of the overall improvement is due to the GPUs. Interestingly enough, the overall gains compared to the baseline are ten-fold. For this one, the paper provides a figure that breaks down the time spent in each phase/optimization of Cobra. In read-heavy workloads, polygraph pruning, which is GPU-optimized, dominates the entire computation, suggesting that a lot of the gains may come from the use of specialized hardware.

3) Better parallel hardware? Are GPUs the best hardware to accelerate pruning? Maybe some better alternative exists? FPGAs?

4) Is it fast enough? While Cobra is significantly faster than other approaches, it may still be not fast enough for use in some production workloads. While it can handle 10k transactions in ~15 seconds, real production workloads can produce more transactions in under one second. The paper claims that Cobra can sustain an average load of 2k requests per second, which is enough for many large services. 2k transactions per second is the scale of systems about 35 years ago.

5) Do we need this in production? We spent quite a bit of time discussing this. Aside from concerns in point (3) above, there may be less utility from checking a production system. Achieving serializability in happy-case is not as difficult. There are plenty of databases out there that do just that. Testing systems in production is like testing a happy-case execution most of the time, so there may be little incentive to do that, especially given the cost of Running Cobra.

Keeping the same guarantees under failures is more difficult, this is why tools like Jepsen stress test the system by introducing the faults. Our thinking was that Cobra when combining with the fault injection can be a powerful stress-test tool. And with the capacity to check larger histories, it may be useful for checking more involved scenarios and doing routine fuzz testing (maybe even check-in testing!) to try to prevent engineers from introducing bugs.

As far as continuous production use, we did not have many scenarios, aside from a service provider checking its own compliance with some consistency SLAs or a user trying to catch a service provider to get a discount (but at what cost?)

6) Limitations. Cobra works only on key-value stores and only with a subset of common operations. For instance, it does not support range operations. Checking the serializability of range commands requires knowing not only what keys exist but also what keys do not exist in the system. This is hard in a black-box production system. However, our thought was, if this is not a production system, and you start with a blank-state for testing purposes, the knowledge of what keys do not exist is there — initially, none of the keys exist. Maybe in such a scenario, Cobra can add support for ranged operations. Especially since we think it will be more useful as a very powerful testing tool (point 5 above) rather than a production monitoring tool. There may be other reasons for the lack of range operation support that we do not know or understand.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. A large scale analysis of hundreds of in-memory cache clusters at Twitter.

In the 41st distributed systems reading group meeting, we have looked at in-memory caches through the lens of yet another OSDI20 paper: “A large scale analysis of hundreds of in-memory cache clusters at Twitter.” This paper explores various cache usages at Twitter and distills the findings into a digestible set of figures. I found the paper rather educational.  It starts by describing Twitter’s cache architecture, called Twemcache. Twemcache operates as a managed service, with cache clustering starting up, scaling up/down in a semi-automatic manner. After the brief Twemcache description, the paper starts to distill the findings of the cache usage itself. A few of the more interesting findings: 

  • A sizeable number of cache cluster at Twitter are used for write-heavy workloads/applications
  • 15% of key-value pairs have values smaller than the key. Of course, this may be specific to Twitter’s keying scheme, which authors say seem to have rather large keys.
  • Objects of some sizes tend to be more popular. Side question here. Does this correlate with some application or use case? So, maybe some use cases favor certain sizes?
  • TTL usage: bounding inconsistency/staleness, implicit deletion of objects.
  • TTL usage: expirations are more efficient than evictions. This one kind of makes sense, but it shifts some of the burdens of managing objects life-cycle in the cache over to applications that set TTLs 

The paper goes more in-depth about a variety of other important topics, such as evection methods, object popularity, etc. Another important point and potentially one of the most important ones for the academic community is the dataset — the authors released their dataset, which is a super nice thing to do. 

Our presentation is available here:

Discussion


This paper was educational and shed a light on cache usage at a big company, including some pretty interesting statistics. We did not have fundamental questions about the paper or its findings.

1) Dataset. One of the bigger discussion points that were mentioned multiple times is the public dataset. Having such a large dataset is very nice for many researchers in academia. Of course, the dataset applies to the cache usage, but we think it may be useful for database researchers as well. For example, taking into account all the cache misses may produce some real representation of read workloads against the storage systems. However, such access patterns may be less useful when applied to strongly consistent storage, as caches are less likely to be used there.

2) Dataset distribution. The dataset for this paper is huge (2.8 TB compressed), and distributing it is a challenge on its own, making it even more impressive that a dataset is available. 


3) Object Popularity. Authors note a few deviations from expected: (1) unpopular objects are often even more unpopular than expected, and (2) popular objects are less popular than expected. One question we had here (and in many other places) is how specific such finding is to Twitter? Maybe a better insight into the type of workloads the exhibit such deviations from expected would have helped understand this better, but it is likely that the authors simply could not disclose the details about specific workloads/tasks.


4) Write-heavy workloads. The biggest question here is what types of workloads use cache and are write-heavy. One suggestion was “counters” (i.e. likes, retweets, etc), which makes sense as it is a piece of data that may be both accessed a lot, maybe a bit stale, and gets updated relatively frequently. Counters of sorts are also used as rate limiters, according to the paper. Another idea is maybe some analytics workloads, where some weights/models are updated relatively frequently. 

5) Miss ratio. It was rather interesting to see the miss ratio figures, especially the ratio of max miss ratio to min miss ratio. Ultimately, a cache miss must be followed by a read from the underlying storage system. A high miss ratio puts more stress on the storage system. A high ratio between max and min miss ratios over the course of a week may indicate a workload that spikes in the number of cache misses on occasion. Such workload would require a more robust/overprovisioned storage to tolerate the spikes in requests to it due to cache misses.  For example, Facebook avoids sudden traffic migrations between datacenters to control cache misses and not overwhelm the underlying infrastructure.

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Virtual Consensus in Delos

We are continuing through the OSDI 2020 paper list in our reading group. This time we have discussed “Virtual Consensus in Delos,” a consensus paper (Delos is yet another greek island to continue the consensus naming tradition). Delos relies on the log abstraction to keep track of all commands/operations and their order. Traditionally, some consensus protocol, like Raft or MultiPaxos, would maintain such log. Delos, however, separates the log from consensus and exposes a virtual log to the higher application levels. The virtual log is composed of many underlying log fragments or Loglets, and each Loglet is supported by some replication protocol. The cool part about Loglets is that they do not need to share the same hardware, configuration, or even the same protocol. This allows Delos to seamlessly switch from one Loglet type or configuration to another to support the ongoing workload or scale the system. Just like in many other strongly consistent systems, there is Paxos in Delos. The virtual log configuration is supported by a Paxos-backed Metastore. This makes Paxos the source of consistency in the systems while keeping it away from the “data-path.” Since Delos can switch Loglets supporting the virtual log, each individual Loglet does not need to be as fault-tolerant as MultiPaxos or Raft. The NativeLoglet implements a stripped-down replication scheme the lacks leader election, and upon the leader failure, Delos can simply switch to a different Loglet. This switching process is a bit involved, so please read the paper for the details. 

We had a very nice presentation of the paper, although it is a bit longer than our usual presentations:

Discussion.

After the presentation, we have spent another 40 minutes as a group discussing the paper. A few of the key discussion points:

1) Similarity with WormSpace paper. Delos appears similar to the WormSpace paper we have discussed in one of the first reading group sessions. This makes sense, as both papers include Mahesh Balakrishnan. WormSpace also proposes a virtual log kind of system, but it does not talk about changing protocols on the fly, instead, it focuses on allocating chunks of the log to different clients/applications/leaders. So in contrast to Delos that uses a Loglet for some undermined amount of time (i.e. until it fails), WormSpace allocates fixed chunks of a virtual log ahead of time. Both systems use single-shot Paxos as the source of consistency to allocate different portions of the virtual log, and both systems have similar problems to solve, like a failure of a party responsible for a virtual log segment. 

2) Use case for mixing protocols. One of the motivations/threads in the paper is switching from one Loglet implementation to another. This use case largely motivates the ability to switch protocols “on the fly”, but it is a pretty rare use case. We were brainstorming other scenarios where such an ability is useful. One of the possibilities is adjusting to the workloads better, as some protocols may handle certain workloads better than the others. A similar idea is described in this paper, although it talks about switching between leader-full and leader-less protocols on the fly to provide better performance depending on the conflict-rate. 

3) Loglet sealing. The mechanism for switching between Loglets is quite complicated. It involves a seal operation that tells a Loglet to stop accepting the operations, however, it is not very trivial in the NativeLoglet. The nuances come from the fact that an operation may still get into the log after the Loglet is sealed and before the Loglet tail is determined and written to the Metastore. The presentation describes this situation pretty nicely. The seal may be a misleading operation, as it does not fully prevent the data from being written into the log. One analogy we had is that sealing is akin to starting closing a valve on a pipe — as we are closing it, some water may still go through, and we need to make sure to collect all that water with the checkTail operation.

4) Availability delay when switching Loglets. Although not apparent from the figures/evaluation, we believe that there is a small delay in the system when it switches from one Loglet to another. This is related to point (3) above, as sealing is not an instantaneous operation, we need to use checkTail to figure out the last log position of the old LogLet to start the new one. As such, until this position is figured out and written to the Metastore, a new Loglet may not accept operations just yet (also, a new Loglet is not known until the Metastore updates anyway). On this subject, we also had a quite long discussion on trying to see if we can improve the seal-checkTail functionality to make the transition faster, but it seems that quite a bit of a challenge comes from the simplified notion of NativeLoglet and the need to seal it without writing the “seal” operation in the log itself. 

 

Reading Group

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Planetary-Scale Systems Seminar Spring 2021

This spring semester I am teaching an exciting seminar class: “Planetary-Scale Systems.” I will start the seminar with a 4 lectures long crash course to get my students on the same page, but the bulk of the class will be paper presentations and discussions. The format is similar to the zoom reading group I am running.

The class meets twice a week, on Tuesdays and Thursdays. On day one, we will have a paper presentation, followed by a class discussion. On day two we take the discussion up a notch and dive deeper into 2-3 select topics/questions from day one. The time should also allow students to prepare for the in-depth discussion.

Speaking of preparing, all students should read the paper and ask questions before day one to help drive the discussion. Similar applies to the in-depth discussion, as each student is expected to contribute to the discussion. 

Overall, we will cover 11 papers, roughly broken down into 3 groups. The papers are as follow:

Planetary Scale Storage

Planetary-Scale Analytics & ML

Blockchain

Reading Group. hXDP: Efficient Software Packet Processing on FPGA NICs

Last reading group meeting we have discussed “hXDP: Efficient Software Packet Processing on FPGA NICs.” This paper talks about using FPGA NICs to offload some CPU cycles doing certain routine packet processing tasks. In particular, the paper implements XDP purely in FPGA and achieves a performance similar to that of a single x86 CPU core. One of the bigger points in the paper is the need to keep a small footprint on FPGA in order to save FPGA resources for other potential uses, and the authors deliver on the front as well, using under 10% of the logic resources. The paper surely was a lot of work. It has a significant compiler component, hardware component, and comprehensive evaluation. The compiler is very important for the performance of the system, as it includes lots of optimizations, and many of these optimizations had a corresponding implementation in FPGA.

Below you can find a video of the presentation. This paper was a bit out of my comfort/expertise zone, so I may have oversimplified things at times.

 

Discussion

1) Benefit for distributed systems. One of the discussion points was the potential benefit to distributed applications from this. One immediate candidate we saw is the partial partitions paper we have discussed before. Nifty relies on the forwarding layer to mask such partitions, and having the packet inspection/forwarding done away from the CPU can be very useful in keeping the performance of the distributed application. To go a bit more general, may be any application that heavily relies on forwarding can take advantage of similar technology.

2) Performance evaluation. According to the paper, hXDP has similar throughput as the ~2.1 GHz x86 core. However, the CPU used in the eval was a 6.5 years old Xeon, not exactly state of the art silicon. However, it is worth noting that the development board used is also from the same time period as the CPU.

3) FPGA & CPU? Will we see FPGAs integrated into PCs? or Server CPUs? Unfortunately, non of us at the meeting had any real expertise with FPGAs, so this is a tough one. But given that CPU manufacturers have been buying FPGA manufacturers (Intel + Altera, AMD + Xilinx), there is a possibility of these two technologies coming closer, especially in servers, many of which already have FPGAs.

4) Cost. The development board is $7000, which is not cheap, but hXDP performance compares to just one core of a server-grade CPU, and paying 7k for one core seems a bit steep. But it is important to understand a few things. First, this was a development board, so it has more features and than the hardware actually used in production. Second, because this is a dev board, the economy of scale is not there. Third, hXDP used very little resources of the FPGA, so a lot of it can be used for other tasks, making even the $7k cost viable.

 

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!

Reading Group. Toward a Generic Fault Tolerance Technique for Partial Network Partitioning

Short Summary

We have resumed the distributed systems reading group after a short holiday break. Yesterday we discussed the “Toward a Generic Fault Tolerance Technique for Partial Network Partitioning” paper from OSDI 2020. The paper studies a particular type of network partitioning – partial network partitioning. Normally, we expect that every node can reach every other node; under the network partitions, we often assume that a node or a group of nodes are “disconnected” from the rest of the cluster and cannot be reached. Partial partitioning assumes that some communication remains, and some nodes outside the partition can still communicate with the partitioned servers. The paper studies several public bug reports from popular systems and finds quite a few issues attributed to the partial partitioning problem. A simple example of the issue is a system having one leader and multiple follower nodes. If the leader is partitioned away from the followers, the progress may stop. At the same time, if the leader can still communicate with a configuration manager (ZooKeeper) responsible for picking the next leader, that communication manager won’t see a problem and not elect a new leader, causing the entire system to stall. The paper has many insights about the partial partition failures, and it is worth taking a look at the paper itself for these details.  Aside from extensive exploration of the problem, the paper proposes a simple solution to mask such partial partitions, called Nifty. Nifty construct a connectivity graph and uses it to relay/reroute messages between nodes when the direct communication link between nodes fails.

Video presentation

Discussion

We had a pretty lively discussion of this paper.

1) Monitoring. One of the issues identified in the paper is that such failures caused by partial partitions often go silent. However, the proposed solution does not seem to include the monitoring/notification service itself. We speculate that it should be trivial to add it to Nifty.

2) System Design. Another important find from the paper is that many of the bugs were due to design problems. We spent considerable time discussing how better design with model checking can help avoid such issues. One of the issues, of course, is that model checking is only as good as the assumptions you put in. So if the design was checked, but the designer did not take into account, for example, message loss, it may not have caught the problem. The paper raises awareness for the problem, so hopefully, the next generation of systems can be model-checked with partial network partitions in mind.

3) Nifty performance. We had two major questions with regards to performance. The first one is whether Nifty can scale to large clusters, given its all-to-all communication requirement for constructing the connectivity graph. This all-to-all communication gives a quadratic complexity, but we also think that collecting this connectivity information can be done relatively infrequently, given that the application can tolerate some message delay/loss. The second question is whether Nifty can handle large messages. Data-driven systems, like databases, may transmit a lot of data between nodes, and having all these data flow through some “relay” nodes may put more stress on such. Also, both the topology of connected nodes and the state of the application matter.  If two partial partitions are connected by just one server, it will put more stress on that one machine. Similarly, if the system is in a state that requires more data transfer, such as building a new replica, it may cause a lot of traffic/data to flow through Nifty, putting more stress on the “relay” nodes.

4) Other types of network partitions. The paper talks about (mainly) partial symmetrical partitions. Another interesting type of network partition is an asymmetrical or one-way partition, when messages flow in one direction but not the other. However, one can argue that this is still a partial partition, and Nifty can handle such a case with its connectivity graph approach, assuming the graph is directed.

5) Failure Masking. Nifty masks failures, which is great when we need the applications to continue working. However, what happens when/if Nifty fails? Will it create an even bigger/disastrous failure? Another question is whether the partial partition may exist as a transient state towards a full network partition. In such a case, Nifty only gives a little bit of extra time to react to the issue as it emerges and cannot be the only solution to dealing with the problem (hence discussion point #1 and the need for monitoring).

6) We also noted some similarities with the recent Cloudflare issue.

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!