IPC in Rust

on 2024-09-26

At work, our team recently found ourselves in need of a high-performance IPC solution in Rust. Our search led us to an insightful article by 3tilley, "IPC in Rust - a Ping Pong Comparison," which provided a great starting point for our exploration.

Inspired by this work, we decided to dive deeper and conduct our own performance measurements, with a particular interest in the promising Iceoryx framework. Building on the work presented in the original article, we will be using UNIX Domain Sockets (both stream and datagram), Memory Mapped Files, and Shared Memory using iceoryx crate to explore the IPC performance between processes running on the same machine.

Setup


We will measure the time taken to complete a ping-pong cycle between two processes communicating over IPC. All the experiments are setup similar to the original work,

  1. A producer process sends a ping
  2. A consumer process replies with a pong
  3. We measure the round-trip time for this exchange

and we will be using the same benchmarking tool Divan.

Approach 1 - UNIX Domain Stream Socket


UNIX Domain Sockets(UDS) are typically considered a better option if data exchange is needed between processes running on the same UNIX host, compared to IP sockets. They are said to be faster and lighter than IP sockets as they avoid some operations needed for network interface.

Here's a simplified implementation of the UNIX Domain Stream Socket approach:

...
// Producer
const UNIX_SOCKET_PATH: &str = "/tmp/unix_stream.sock";

impl UnixStreamRunner {
    pub fn new(start_child: bool) -> Self {
        let unix_listener = UnixListener::bind(UNIX_SOCKET_PATH).unwrap();
        let exe = crate::executable_path("unix_stream_consumer");
        let child_proc = if start_child {
            Some(Command::new(exe).spawn().unwrap())
        } else {
            None
        };
        let wrapper = UnixStreamWrapper::from_listener(unix_listener);
        Self {
            child_proc,
            wrapper,
        }
    }

    pub fn run(&mut self, n: usize, print: bool) {
        let mut buf = [0u8; 4];
        for _ in 0..n {
            self.wrapper.stream.write(b"ping").unwrap();
            self.wrapper.stream.read_exact(&mut buf).unwrap();
            if !buf.eq(b"pong") {
                panic!("Sent ping didn't get pong")
            }
        }
    }
}
...

// Cosumer
fn main() {
    let mut wrapper = ipc::unix_stream::UnixStreamWrapper::unix_connect();
    let mut buf = [0u8; 4];
    while let Ok(_) = wrapper.stream.read(&mut buf) {
        if buf.eq(b"ping") {
            wrapper.stream.write(b"pong").unwrap();
        } else {
            panic!("Received unknown value {:?}", buf)
        }
    }
}

For a domain stream socket, we provide path on the file system which will be created and used as the socket's address. After connecting the sockets, ping is written to the socket and the consumer reads the message into a buffer, then responds with a pong message.

Approach 2 - Unix Datagram Socket


While implementing UNIX doamin datagram socket, I initially tried to use a single socket path /tmp/unix_datagram.sock for the communication, similar to unix stream socket. I tried to bind both processes to the same socket path and received "Address already in use" error during runtime.

Then we found that, for unix domain datagram sockets, we have to bind each process to a separate socket path. For communication between these two sockets, we connect() each socket to the other which sets the peer address.

Here's a simplified implementation:

...
// Producer
const UNIX_DATAGRAM_SOCKET_1: &str = "/tmp/unix_datagram1.sock";
const UNIX_DATAGRAM_SOCKET_2: &str = "/tmp/unix_datagram2.sock";

impl UnixDatagramWrapper {
    pub fn new(is_child: bool) -> Self {
        let (socket_path, peer_socket_path) = if is_child {
            (UNIX_DATAGRAM_SOCKET_1, UNIX_DATAGRAM_SOCKET_2)
        } else {
            (UNIX_DATAGRAM_SOCKET_2, UNIX_DATAGRAM_SOCKET_1)
        };
        let socket = UnixDatagram::bind(socket_path).unwrap();
        Self { socket, peer_socket_path, }
    }

    pub fn connect_to_peer(&self) {
        self.socket.connect(&self.peer_socket_path).unwrap();
    }
}

impl UnixDatagramRunner {
    pub fn new(start_child: bool) -> Self {
        let is_child = false;
        let wrapper = UnixDatagramWrapper::new(is_child);

        let exe = crate::executable_path("unix_datagram_consumer");
        let child_proc = if start_child {
            Some(Command::new(exe).spawn().unwrap())
        } else {
            None
        };
        // Awkward sleep to make sure the child proc is ready
        sleep(Duration::from_millis(500));
        wrapper.connect_to_peer();

        Self { child_proc, wrapper, }
    }

    pub fn run(&mut self, n: usize, print: bool) {
        let start = Instant::now();
        let mut buf = [0u8; 4];
        for _ in 0..n {
            self.wrapper.socket.send(b"ping").unwrap();
            self.wrapper.socket.recv_from(&mut buf).unwrap();
            if !buf.eq(b"pong") {
                panic!("Sent ping didn't get pong")
            }
        }
    }
}
...

// Conusmer
fn main() {
    let is_child = true;
    let wrapper = ipc::unix_datagram::UnixDatagramWrapper::new(is_child);
    wrapper.connect_to_peer();

    let mut buf = [0u8; 4];
    while let Ok(_) = wrapper.socket.recv(&mut buf) {
        if buf.eq(b"ping") {
            wrapper.socket.send(b"pong").unwrap();
        } else {
            panic!("Received unknown value {:?}", buf)
        }
    }
}

For each process, we bind() sockets to the socket_path and connect each socket to their peer_socket_path.

Approach 3 - Memory Mapped Files


Memory-mapped files are a method of accessing file contents by mapping them in the virtual address space of the calling process. To make the changes made by one process be visible to the other, we call mmap() with MAP_SHARED flag enabled. The synchronisation between processes has been handled similar to the original work using raw_sync crate.

// Shared memory layout
//|    0    |    1    |    2    |    3    |    4    |    5    |    6    |    7    |
//|   producer lock   |   consumer lock   |      data buffer (ping or pong)       |
pub struct MmapWrapper {
    pub mmap: MmapMut,
    pub owner: bool,
    pub our_event: Box<dyn EventImpl>,
    pub their_event: Box<dyn EventImpl>,
    pub data_start: usize,
}

impl MmapWrapper {
    pub fn new(owner: bool) -> Self {
        let path: PathBuf = "/tmp/mmap_data.txt".into();
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(&path)
            .unwrap();
        file.set_len(8).unwrap();

        let mut mmap = unsafe { MmapMut::map_mut(&file).unwrap() };
        let bytes = mmap.as_mut();

        // The two events are locks - one for each side. Each side activates the lock while it's
        // writing, and then unlocks when the data can be read
        let ((our_event, lock_bytes_ours), (their_event, lock_bytes_theirs)) = unsafe {
            if owner {
                (
                    BusyEvent::new(bytes.get_mut(0).unwrap(), true).unwrap(),
                    BusyEvent::new(bytes.get_mut(2).unwrap(), true).unwrap(),
                )
            } else {
                (
                    // If we're not the owner, the events have been created already
                    BusyEvent::from_existing(bytes.get_mut(2).unwrap()).unwrap(),
                    BusyEvent::from_existing(bytes.get_mut(0).unwrap()).unwrap(),
                )
            }
        };

        // Confirm that we've correctly indexed two bytes for each lock
        assert!(lock_bytes_ours <= 2);
        assert!(lock_bytes_theirs <= 2);
        if owner {
            our_event.set(EventState::Clear).unwrap();
            their_event.set(EventState::Clear).unwrap();
        }

        Self {
            mmap,
            owner,
            our_event,
            their_event,
            data_start: 4,
        }
    }

    pub fn signal_start(&mut self) {
        self.our_event.set(EventState::Clear).unwrap()
    }

    pub fn signal_finished(&mut self) {
        self.our_event.set(EventState::Signaled).unwrap()
    }

    pub fn write(&mut self, data: &[u8]) {
        let bytes = self.mmap.as_mut();

        for i in 0..data.len() {
            bytes[i + self.data_start] = data[i];
        }
    }

    pub fn read(&self) -> &[u8] {
        &self.mmap.as_ref()[self.data_start..self.data_start + 4]
    }
}

MmapMut::map_mut() function creates a shared and writeable memory map backed by a file. The First 4 bytes in the memory mapping is reserved for event control and the rest 4 bytes are used to store the data.

pub fn run(&mut self, n: usize, print: bool) {
    let instant = Instant::now();
    for _ in 0..n {
        // Activate our lock in preparation for writing
        self.wrapper.signal_start();
        self.wrapper.write(b"ping");
        // Unlock after writing
        self.wrapper.signal_finished();
        // Wait for their lock to be released so we can read
        if self.wrapper.their_event.wait(Timeout::Infinite).is_ok() {
            let str = self.wrapper.read();
            if str != b"pong" {
                panic!("Sent ping didn't get pong")
            }
        }
    }
}

We lock, write, unlock and read the data when we are allowed to.

Approach 4 - Shared Memory using iceoryx


iceoryx library provides a zero-copy and lock-free inter-process communication. Currently the library doesn't have support for request-response mechanism, so we will use their publish-subscribe mechanism to measure the ping-pong cycle response time.

Here is a simplified implementation:

...
// Producer
impl IceoryxWrapper {
    pub fn new(is_producer: bool) -> IceoryxWrapper {
        const REQUEST_SERVICE_NAME: &'static str = "Request";
        const RESPONSE_SERVICE_NAME: &'static str = "Response";

        let request_name = ServiceName::new(REQUEST_SERVICE_NAME).unwrap();
        let request_service = zero_copy::Service::new(&request_name)
            .publish_subscribe()
            .open_or_create()
            .unwrap();

        let response_name = ServiceName::new(RESPONSE_SERVICE_NAME).unwrap();
        let response_service = zero_copy::Service::new(&response_name)
            .publish_subscribe()
            .open_or_create()
            .unwrap();

        let (publisher, subscriber) = if is_producer {
            (
                request_service.publisher().create().unwrap(),
                response_service.subscriber().create().unwrap(),
            )
        } else {
            (
                response_service.publisher().create().unwrap(),
                request_service.subscriber().create().unwrap(),
            )
        };

        IceoryxWrapper {
            publisher,
            subscriber,
        }
    }
}

impl IceoryxRunner {
    pub fn run(&mut self, n: usize, print: bool) {
        let start = Instant::now();
        for _ in 0..n {
            let sample = self.wrapper.publisher.loan_uninit().unwrap();
            let send_payload = sample.write_payload((*b"ping").into());
            send_payload.send().unwrap();
            // Wait for the response
            loop {
                if let Some(recv_payload) = self.wrapper.subscriber.receive().unwrap() {
                    if !recv_payload.eq(b"pong") {
                        panic!("Received unexpected payload")
                    }
                    break;
                }
            }
        }
    }
}
...

// Consumer
fn main() {
    let wrapper = ipc::iceoryx::IceoryxWrapper::new(false);
    loop {
        while let Some(recv_payload) = wrapper.subscriber.receive().unwrap() {
            if !recv_payload.eq(b"ping") {
                panic!("Received unexpected payload")
            }

            let sample = wrapper.publisher.loan_uninit().unwrap();
            let payload = sample.write_payload(*b"pong");
            payload.send().unwrap();
        }
    }
}

We have defined two publish-subscibe services. One service is used to send the ping messages and the other is used to send the pong response. After ping message is published, the child proccess reads the message by subscribing to the service. Then the child process publishes pong message to the other service which is subscribed and read by the parent process.

Experimental Setup


Hardware

  • Device: MacBook Air M1
  • CPU: 8 cores
  • Memory: 8GB

Testing Environments

  1. MacOS (native)
  2. Linux (virtual machine on the same MacBook)

Results


MacOS

#ApproachTime per Operation (µs)Ops per SecondTime Comparison to Shared Memory
1stdin_stdout5.741174k77.6x
2tcp_nodelay12.0283k162.4x
3tcp_yesdelay11.5986k156.6x
4udp11.8080k159.4x
5unix_datagram3.955253k53.4x
6unix_stream3.830261k51.8x
7iceoryx0.3103521k4.2x
8shared_memory0.07513310k1.0x
9memory_mapped_file0.07413430k1.0x

macos_performance_comparison

Linux

#ApproachTime per Operation (µs)Ops per SecondTime Comparison to Shared Memory
1stdin_stdout53.5818k491.5x
2tcp_nodelay64.6515k593.1x
3tcp_yesdelay64.8615k595.0x
4udp59.7817k548.4x
5unix_datagram56.5818k519.0x
6unix_stream57.0518k523.3x
7iceoryx0.4852692k4.4x
8shared_memory0.10610910k1.0x
9memory_mapped_file0.10910220k1.0x

linux_performance_comparison

Here are some observations based on the performance data for macOS and Linux:

  1. Shared memory and memory-mapped file approaches are consistently the fastest on both platforms, with the lowest time per operation and highest operations per second.
  2. Iceoryx performs remarkably well and is significantly faster than traditional IPC methods like pipes, sockets, and Unix domain sockets.
  3. While UNIX domain sockets are 3x faster comapared to IP sockets on macOS, it is interesting to see minimal performance difference on Linux env.

Conclusion


Iceoryx emerges as a very attractive option for high-performance inter-process communication. While it's not as fast as raw shared memory or memory-mapped files, it abstracts away the complexities of manual synchronization required in shared memory and memory-mapped file approaches.

Full source code is available on Github.

We've also explored performances with increased IPC payload sizes. Details on our findings will be shared in an upcoming post 😉