Tag Archives: network

Reading Group. In Reference to RPC: It’s Time to Add Distributed Memory

Our 70th meeting covered the “In Reference to RPC: It’s Time to Add Distributed Memory” paper by Stephanie Wang, Benjamin Hindman, and Ion Stoica. This paper proposes some improvements to remote procedure call (RPC) frameworks. In current RPC implementations, the frameworks pass parameters to function by value. The same happens to the function return values. Passing data by value through copying works terrific for small parameters and returns. Furthermore, since the caller and callee have independent copies of the data, there are no side effects when one copy is modified or destroyed. Things start to become less than ideal when the data is large and gets reused in multiple RPC calls. 

For example, if a return value of one function needs to propagate to a handful of consecutive calls, we can end up performing many wasteful data copies. The wastefulness occurs as data transfers back to the caller and later gets copied multiple times to new RPCs. The obvious solution is to avoid unnecessary copying and pass the data within the RPC framework by reference using shared memory. In fact, according to the paper, many applications resort to doing just this with distributed key-value stores. Before invoking a remote function, a caller can save the parameters to the shared KV-store, and pass the set of keys as parameters to the RPC. Managing memory at the application level, however, has some problems. For example, the application must manage memory reclamation and cleanup objects no longer needed. Another concern is breaking the semantics of RPCs, as shared mutable memory can have side effects between different components of the system. 

Instead of an ad-hoc application-level solution, the paper proposes a managed approach to shared memory in RPCs. The improved framework preserves the original RPC semantics while enabling shared memory space and facilitating automatic memory management and reclamation. The paper suggests having immutable shared memory where values remain read-only after an initial write. This shared memory system will have a centralized scheduler for RPCs and a memory manager to keep track of memory usage and active memory references. With a centralized memory management approach, the RPC framework can do a few cool things. For instance, the memory manager knows when a reference is passed between nodes and when it goes out of scope on all functions/clients. This global memory knowledge allows the memory manager to perform garbage collection on values no longer needed. The paper describes this in a bit more detail, including the APIs to facilitate global memory tracking. Another benefit of the RPC framework with a centralized scheduler and memory manager is locality awareness. With information about data location in the cluster, the system can collocate function invocation on the machines that already have the values needed. 

The paper mentions that the proposed vision is already somewhat used in many specialized systems, such as Ray (Ray is from the same team as the paper) or Distributed TensorFlow. However, these systems are far from general and often used as parts of larger software stacks. The challenge with this new improved RPC framework is to make it general enough to span across the systems to allow better integration and more efficient resource use. I think of this as an RPC-as-a-service kind of vision that facilitates efficiency, interoperability and provides some blanket guarantees on communication reliability. 

As always, the paper has a lot more details than I can cover in the summary. We also have the presentation video from the reading group:


1) Other Distributed Systems Frameworks. Frameworks for building distributed systems seem to gain more and more popularity these days. We were wondering how these frameworks address some of the same issues, such as copying data between components and managing locality. This is especially relevant for actor-style frameworks. We had some feedback on the Orleans framework. Since locality is important for performance, frameworks tend to move actors to the data. It often seems to be cheaper to move a small executable component than large chunks of data. More specialized data processing systems have been using these tricks for some time and can move both data and compute tasks around for optimal performance. 

2) Fault Tolerance. Fault tolerance is always a big question when it comes to backbone distributed systems like this one. With stateful components and shared memory subsystems, the improved RPC framework may be more susceptible to failures. While well-known and traditional techniques can be used to protect the state, such as replicating the shared memory, these will make the system more complicated and may have performance implications. Some components, like scheduler and memory manager, are on the critical path of all function invocations. Having strongly consistent replication there may not be the best choice for scalability and performance. The other option is to expose errors to the application and let them deal with the issues like in more traditional RPC scenarios or lower-level communication approaches. However, due to the size and complexity of the improved system, it would have been nice to have some failure masking. We were lucky to ask Stephanie Wang, the author of the paper, about fault tolerance, and she suggests RPC clients handle errors by reties, however, she also mentioned that it is still an open question about how much overall fault tolerance and fault masking is needed in a system like this one. 

3) Programming Model vs. Implementation. Another important discussion question was on the vision itself. The paper mentions a few specialized systems that already have many of the features described. This made us wonder what is the paper proposes then? and what are the challenges? It appeared to us that the paper’s vision is for a general and interoperable RPC system to act as a glue for larger software stacks. Stephanie provided some of her first-hand feedback here that I will quote directly:

“In my opinion, Ray already has (most of) the right features for the programming model but not the implementation. One of the main implementation areas that the paper discusses is distributed memory management. Actually, I don’t think that any of the systems mentioned, including Ray, do this super well today. For example, I would say that Distributed TensorFlow is quite sophisticated in optimizing memory usage, but it’s not as flexible as pure RPC because it requires a static graph. The second thing that I think is still missing is interoperability, as you said. I believe that this will require further innovations in both programming model and implementation.”

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Multitenancy for Fast and Programmable Networks in the Cloud.

We discussed “Multitenancy for Fast and Programmable Networks in the Cloud” in the 59th DistSys Reading Group meeting. In a sense, this was a continuation of a previous discussion we had a few months ago when covering Pegasus paper. 

Pegasus and many other new protocols rely on specialized programmable network hardware that is not yet available to regular cloud users. This is because such hardware has no support for multitenancy and cannot be efficiently shared and virtualized to allows cloud users to get a virtual slice of the hardware pie. This paper discusses enabling multitenancy on today’s smart network hardware, such as smart switches. 

It is not an easy feat to pull off — these switches (in the paper, the authors used a switch based on Barefoot Tofino logic) have limited programmability, limited resources, and can run just one program. Of course, if you can have only one running program, it is hard to run code from different users/tenants. The paper works around this by compiling the code from all tenants together into one “jumbo-program” that can run on the switch. This tenant packing also solves an issue with resource sharing and isolation as the switch’s hardware resources must be defined at the compile time. Of course, there are also issues isolating tenant’s code. This is done at compile time too by renaming the tenant program’s fields, such as tenant-defined headers and table names to the tenant-unique names. A few restrictions are placed on tenants for the sake of isolation, and the paper goes into more detail. One important assumption is that the tenants are expected to be isolated based on their virtual networks (VLANs).

In addition to being the glue that holds all the tenants together, the “jumbo-program” incorporates some “system” code to provide common functionality. For instance, the “system” program can process common packet headers. This allows it to read VLAN ID (ID) and then invoke proper tenant so that the tenant code can do its own header matching/parsing.

The “system” program of a “jumbo-program” also helps us deal with one tricky part here. See, the program on a switch needs to run at the network line speed, and cannot delay any packets, so there is no way to delay any processing. This means that the program runs as a pipeline in multiple stages with no way of returning to a previous one. The “system” program “sandwiches” the tenant program and in a sense establishes some rules and bounds for the tenants.

I already mentioned that the resources are defined/allocated ahead of time. This is also true for memory, as programmers must hardcode the array sizes of stateful memory they want to use. However, the paper works around this by kind of building a virtual memory in the system program of the “jumbo-program”. The “jumbo-program” just allocates a huge array of memory in each pipeline stage and uses a configuration stage to store the memory boundaries for each tenant. Tenants use the boundaries as index offsets into the big array, essentially implementing virtual memory within the “jumbo-program”, and allowing for dynamic memory reallocation by adjusting these memory offsets.

The paper has its HotCloud presentation. And as usual, we had our own presentation, this time by David Correa:


1) Other approaches? The paper mentions some alternatives to the switches they have used. For example, software switches are a lot easier to program. FPGAs can be used for handling some network functions as well. But, according to the paper, these are all slower. ASICs, like Barefoot (now owned by Intel) Tofino, are a lot faster, but with such speed, they are more limited. The programmability constraints, the pipeline processing nature, memory limitations (22 MB of SRAM), etc. are big limiting factors though.   

2) Hardware-specific solution. So the solution presented here is rather specific for the hardware available now. This is cool, but I’d want to imagine what can be done in the near future. Can we improve the hardware to reduce these limitations? The authors conjecture that the pipelines will stay: “we expect ASIC-based pipelines—both for switches and NICs—to be more effective than other programmable architectures in terms of both cost and power consumption.” However, maybe somehow better support for multi-tenancy can be backed to a new generation of devices on the hardware level, along with greater flexibility of adding/removing tenants.

3) Not quite cloud-ready? The major pain point of the solution seems to be the need to “mush together” all the tenants before compiling the program and deploying it on the switch. If tenants change (or tenant’s requirements for some hardware resources) the whole thing needs to be repackaged and re-deployed. This sounds like a cumbersome process that may result in some downtime while the new “jumbo-program” is deployed on the network. Exposing this process as some kind of cloud service seems quite infeasible due to these issues. It would have been nice if more resources could have been reallocated between tenants dynamically (of course, we are not experts here, but there may be some real hardware/performance limitation for this very static approach). It would also be nice to have the ability to “side-load” new tenant programs into the “jumbo-program” runtime. 

4) Incentives for the cloud providers? Smart network devices can greatly benefit those who have access to them. We are starting to see many systems relying on these. However, right now, cloud providers are those who have access to such hardware, and it may provide some advantage to these providers and their native services. As such, there may be little incentive to offer these resources in the cloud. That being said, this may also play the other way around. If this becomes a very desired feature, the first cloud provider to offer these virtualized multi-tenant switches stand to benefit, and other providers will follow in the chain reaction of chasing the market. 

Reading Group

Our reading group takes place over Zoom every Wednesday at 2:00 pm EST. We have a slack group where we post papers, hold discussions, and most importantly manage Zoom invites to paper discussions. Please join the slack group to get involved!

Reading Group. Toward a Generic Fault Tolerance Technique for Partial Network Partitioning

Short Summary

We have resumed the distributed systems reading group after a short holiday break. Yesterday we discussed the “Toward a Generic Fault Tolerance Technique for Partial Network Partitioning” paper from OSDI 2020. The paper studies a particular type of network partitioning – partial network partitioning. Normally, we expect that every node can reach every other node; under the network partitions, we often assume that a node or a group of nodes are “disconnected” from the rest of the cluster and cannot be reached. Partial partitioning assumes that some communication remains, and some nodes outside the partition can still communicate with the partitioned servers. The paper studies several public bug reports from popular systems and finds quite a few issues attributed to the partial partitioning problem. A simple example of the issue is a system having one leader and multiple follower nodes. If the leader is partitioned away from the followers, the progress may stop. At the same time, if the leader can still communicate with a configuration manager (ZooKeeper) responsible for picking the next leader, that communication manager won’t see a problem and not elect a new leader, causing the entire system to stall. The paper has many insights about the partial partition failures, and it is worth taking a look at the paper itself for these details.  Aside from extensive exploration of the problem, the paper proposes a simple solution to mask such partial partitions, called Nifty. Nifty construct a connectivity graph and uses it to relay/reroute messages between nodes when the direct communication link between nodes fails.

Video presentation


We had a pretty lively discussion of this paper.

1) Monitoring. One of the issues identified in the paper is that such failures caused by partial partitions often go silent. However, the proposed solution does not seem to include the monitoring/notification service itself. We speculate that it should be trivial to add it to Nifty.

2) System Design. Another important find from the paper is that many of the bugs were due to design problems. We spent considerable time discussing how better design with model checking can help avoid such issues. One of the issues, of course, is that model checking is only as good as the assumptions you put in. So if the design was checked, but the designer did not take into account, for example, message loss, it may not have caught the problem. The paper raises awareness for the problem, so hopefully, the next generation of systems can be model-checked with partial network partitions in mind.

3) Nifty performance. We had two major questions with regards to performance. The first one is whether Nifty can scale to large clusters, given its all-to-all communication requirement for constructing the connectivity graph. This all-to-all communication gives a quadratic complexity, but we also think that collecting this connectivity information can be done relatively infrequently, given that the application can tolerate some message delay/loss. The second question is whether Nifty can handle large messages. Data-driven systems, like databases, may transmit a lot of data between nodes, and having all these data flow through some “relay” nodes may put more stress on such. Also, both the topology of connected nodes and the state of the application matter.  If two partial partitions are connected by just one server, it will put more stress on that one machine. Similarly, if the system is in a state that requires more data transfer, such as building a new replica, it may cause a lot of traffic/data to flow through Nifty, putting more stress on the “relay” nodes.

4) Other types of network partitions. The paper talks about (mainly) partial symmetrical partitions. Another interesting type of network partition is an asymmetrical or one-way partition, when messages flow in one direction but not the other. However, one can argue that this is still a partial partition, and Nifty can handle such a case with its connectivity graph approach, assuming the graph is directed.

5) Failure Masking. Nifty masks failures, which is great when we need the applications to continue working. However, what happens when/if Nifty fails? Will it create an even bigger/disastrous failure? Another question is whether the partial partition may exist as a transient state towards a full network partition. In such a case, Nifty only gives a little bit of extra time to react to the issue as it emerges and cannot be the only solution to dealing with the problem (hence discussion point #1 and the need for monitoring).

6) We also noted some similarities with the recent Cloudflare issue.

Our reading groups takes place over Zoom every Wednesday at 3:30pm EST. We have a slack group where we post papers, hold discussions and most importantly manage Zoom invites to the papers. Please join the slack group to get involved!