Rust - Message Passing

Overview

Estimated time: 40–50 minutes

Master message passing concurrency in Rust using channels for safe communication between threads. Learn about single-producer multiple-consumer channels, bounded and unbounded channels, and the crossbeam library for advanced patterns.

Learning Objectives

Prerequisites

Message Passing Philosophy

Rust follows the motto: "Do not communicate by sharing memory; instead, share memory by communicating." Message passing provides a safer alternative to shared state by sending data between threads through channels.

Basic Channels (mpsc)

The standard library provides Multiple Producer, Single Consumer (mpsc) channels:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Create a channel
    let (tx, rx) = mpsc::channel();
    
    // Spawn a thread to send data
    thread::spawn(move || {
        let messages = vec![
            "Hello",
            "from",
            "the",
            "spawned",
            "thread",
        ];
        
        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // Receive messages in the main thread
    for received in rx {
        println!("Received: {}", received);
    }
    
    println!("All messages received");
}

Expected Output:

Received: Hello
Received: from
Received: the
Received: spawned
Received: thread
All messages received

Multiple Producers

Multiple threads can send to the same receiver by cloning the transmitter:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Create multiple producers
    for thread_id in 0..3 {
        let tx_clone = tx.clone();
        
        thread::spawn(move || {
            for i in 0..3 {
                let message = format!("Thread {} says: Message {}", thread_id, i);
                tx_clone.send(message).unwrap();
                thread::sleep(Duration::from_millis(100));
            }
        });
    }
    
    // Drop the original transmitter so the receiver knows when all senders are gone
    drop(tx);
    
    // Receive all messages
    for received in rx {
        println!("{}", received);
    }
}

Expected Output:

Thread 0 says: Message 0
Thread 1 says: Message 0
Thread 2 says: Message 0
Thread 0 says: Message 1
Thread 1 says: Message 1
Thread 2 says: Message 1
Thread 0 says: Message 2
Thread 1 says: Message 2
Thread 2 says: Message 2

Non-blocking Operations

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500));
        tx.send("Hello after delay").unwrap();
    });
    
    // Non-blocking receive
    loop {
        match rx.try_recv() {
            Ok(msg) => {
                println!("Received: {}", msg);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                println!("No message yet, doing other work...");
                thread::sleep(Duration::from_millis(100));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("Sender disconnected");
                break;
            }
        }
    }
}

Expected Output:

No message yet, doing other work...
No message yet, doing other work...
No message yet, doing other work...
No message yet, doing other work...
No message yet, doing other work...
Received: Hello after delay

Synchronous Channels (Bounded)

Synchronous channels block the sender when the buffer is full:

use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};

fn main() {
    // Create a synchronous channel with buffer size 2
    let (tx, rx) = mpsc::sync_channel(2);
    
    let start = Instant::now();
    
    // Producer thread
    let producer = thread::spawn(move || {
        for i in 0..5 {
            println!("Sending {} at {:?}", i, start.elapsed());
            tx.send(i).unwrap();
            println!("Sent {} at {:?}", i, start.elapsed());
        }
    });
    
    // Consumer thread (slower than producer)
    let consumer = thread::spawn(move || {
        for _ in 0..5 {
            thread::sleep(Duration::from_millis(300));
            let received = rx.recv().unwrap();
            println!("Received {} at {:?}", received, start.elapsed());
        }
    });
    
    producer.join().unwrap();
    consumer.join().unwrap();
}

Expected Output:

Sending 0 at 1ms
Sent 0 at 2ms
Sending 1 at 2ms
Sent 1 at 2ms
Sending 2 at 2ms
Sent 2 at 3ms
Sending 3 at 3ms
Received 0 at 301ms
Sent 3 at 301ms
Sending 4 at 301ms
Received 1 at 602ms
Sent 4 at 602ms
Received 2 at 903ms
Received 3 at 1204ms
Received 4 at 1505ms

Error Handling and Disconnection

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

#[derive(Debug)]
enum WorkerMessage {
    Task(String),
    Shutdown,
}

fn worker(id: u32, rx: mpsc::Receiver<WorkerMessage>) {
    println!("Worker {} started", id);
    
    loop {
        match rx.recv() {
            Ok(WorkerMessage::Task(task)) => {
                println!("Worker {} processing: {}", id, task);
                thread::sleep(Duration::from_millis(100));
                println!("Worker {} completed: {}", id, task);
            }
            Ok(WorkerMessage::Shutdown) => {
                println!("Worker {} shutting down", id);
                break;
            }
            Err(mpsc::RecvError) => {
                println!("Worker {} detected sender disconnection", id);
                break;
            }
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Spawn worker thread
    let worker_handle = thread::spawn(move || {
        worker(1, rx);
    });
    
    // Send some tasks
    for i in 1..=3 {
        tx.send(WorkerMessage::Task(format!("Task {}", i))).unwrap();
    }
    
    // Send shutdown signal
    tx.send(WorkerMessage::Shutdown).unwrap();
    
    // Wait for worker to finish
    worker_handle.join().unwrap();
    
    // Demonstrate error when trying to send after receiver is dropped
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(100));
        match tx.send(WorkerMessage::Task("Late task".to_string())) {
            Ok(_) => println!("Sent late task"),
            Err(e) => println!("Failed to send late task: {}", e),
        }
    }).join().unwrap();
}

Expected Output:

Worker 1 started
Worker 1 processing: Task 1
Worker 1 completed: Task 1
Worker 1 processing: Task 2
Worker 1 completed: Task 2
Worker 1 processing: Task 3
Worker 1 completed: Task 3
Worker 1 shutting down
Failed to send late task: sending on a closed channel

Crossbeam Channels

The crossbeam library provides more advanced channel types. First, add to Cargo.toml:

[dependencies]
crossbeam = "0.8"

Unbounded Channels

use crossbeam::channel;
use std::thread;
use std::time::Duration;

fn main() {
    // Unbounded channel
    let (tx, rx) = channel::unbounded();
    
    // Multiple senders
    for i in 0..3 {
        let tx = tx.clone();
        thread::spawn(move || {
            for j in 0..3 {
                tx.send(format!("Message {}-{}", i, j)).unwrap();
                thread::sleep(Duration::from_millis(50));
            }
        });
    }
    
    // Drop original sender
    drop(tx);
    
    // Receive all messages
    while let Ok(msg) = rx.recv() {
        println!("Received: {}", msg);
    }
}

Bounded Channels with Selection

use crossbeam::channel;
use crossbeam::select;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx1, rx1) = channel::bounded(1);
    let (tx2, rx2) = channel::bounded(1);
    
    // Producer 1
    thread::spawn(move || {
        for i in 0..3 {
            thread::sleep(Duration::from_millis(100));
            tx1.send(format!("Source 1: {}", i)).unwrap();
        }
    });
    
    // Producer 2
    thread::spawn(move || {
        for i in 0..3 {
            thread::sleep(Duration::from_millis(150));
            tx2.send(format!("Source 2: {}", i)).unwrap();
        }
    });
    
    // Consumer using select to receive from either channel
    let mut count = 0;
    loop {
        select! {
            recv(rx1) -> msg => {
                match msg {
                    Ok(msg) => {
                        println!("From channel 1: {}", msg);
                        count += 1;
                    }
                    Err(_) => println!("Channel 1 closed"),
                }
            }
            recv(rx2) -> msg => {
                match msg {
                    Ok(msg) => {
                        println!("From channel 2: {}", msg);
                        count += 1;
                    }
                    Err(_) => println!("Channel 2 closed"),
                }
            }
        }
        
        if count >= 6 {
            break;
        }
    }
}

Expected Output:

From channel 1: Source 1: 0
From channel 2: Source 2: 0
From channel 1: Source 1: 1
From channel 2: Source 2: 1
From channel 1: Source 1: 2
From channel 2: Source 2: 2

Actor Pattern Implementation

use std::sync::mpsc;
use std::thread;
use std::collections::HashMap;

#[derive(Debug)]
enum BankMessage {
    Deposit { account: String, amount: u32 },
    Withdraw { account: String, amount: u32 },
    Balance { account: String, response: mpsc::Sender<u32> },
    Shutdown,
}

struct BankActor {
    accounts: HashMap<String, u32>,
    receiver: mpsc::Receiver<BankMessage>,
}

impl BankActor {
    fn new(receiver: mpsc::Receiver<BankMessage>) -> Self {
        BankActor {
            accounts: HashMap::new(),
            receiver,
        }
    }
    
    fn run(mut self) {
        println!("Bank actor started");
        
        while let Ok(message) = self.receiver.recv() {
            match message {
                BankMessage::Deposit { account, amount } => {
                    let balance = self.accounts.entry(account.clone()).or_insert(0);
                    *balance += amount;
                    println!("Deposited {} to {}. New balance: {}", amount, account, balance);
                }
                BankMessage::Withdraw { account, amount } => {
                    let balance = self.accounts.entry(account.clone()).or_insert(0);
                    if *balance >= amount {
                        *balance -= amount;
                        println!("Withdrew {} from {}. New balance: {}", amount, account, balance);
                    } else {
                        println!("Insufficient funds in {}. Balance: {}", account, balance);
                    }
                }
                BankMessage::Balance { account, response } => {
                    let balance = *self.accounts.get(&account).unwrap_or(&0);
                    println!("Balance inquiry for {}: {}", account, balance);
                    let _ = response.send(balance);
                }
                BankMessage::Shutdown => {
                    println!("Bank actor shutting down");
                    break;
                }
            }
        }
    }
}

// Client handle for interacting with the bank actor
struct BankClient {
    sender: mpsc::Sender<BankMessage>,
}

impl BankClient {
    fn new(sender: mpsc::Sender<BankMessage>) -> Self {
        BankClient { sender }
    }
    
    fn deposit(&self, account: &str, amount: u32) {
        self.sender.send(BankMessage::Deposit {
            account: account.to_string(),
            amount,
        }).unwrap();
    }
    
    fn withdraw(&self, account: &str, amount: u32) {
        self.sender.send(BankMessage::Withdraw {
            account: account.to_string(),
            amount,
        }).unwrap();
    }
    
    fn balance(&self, account: &str) -> u32 {
        let (response_tx, response_rx) = mpsc::channel();
        self.sender.send(BankMessage::Balance {
            account: account.to_string(),
            response: response_tx,
        }).unwrap();
        response_rx.recv().unwrap()
    }
    
    fn shutdown(&self) {
        self.sender.send(BankMessage::Shutdown).unwrap();
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let bank_client = BankClient::new(tx);
    
    // Start the bank actor
    let bank_handle = thread::spawn(move || {
        let bank = BankActor::new(rx);
        bank.run();
    });
    
    // Use the bank
    bank_client.deposit("Alice", 1000);
    bank_client.deposit("Bob", 500);
    bank_client.withdraw("Alice", 200);
    bank_client.withdraw("Bob", 600);  // Should fail
    
    println!("Alice balance: {}", bank_client.balance("Alice"));
    println!("Bob balance: {}", bank_client.balance("Bob"));
    
    bank_client.shutdown();
    bank_handle.join().unwrap();
}

Expected Output:

Bank actor started
Deposited 1000 to Alice. New balance: 1000
Deposited 500 to Bob. New balance: 500
Withdrew 200 from Alice. New balance: 800
Insufficient funds in Bob. Balance: 500
Balance inquiry for Alice: 800
Balance inquiry for Bob: 500
Alice balance: 800
Bob balance: 500
Bank actor shutting down

Performance Considerations

use std::sync::mpsc;
use std::thread;
use std::time::Instant;

fn benchmark_channel_performance() {
    const NUM_MESSAGES: usize = 1_000_000;
    
    // Unbounded channel benchmark
    let start = Instant::now();
    let (tx, rx) = mpsc::channel();
    
    let producer = thread::spawn(move || {
        for i in 0..NUM_MESSAGES {
            tx.send(i).unwrap();
        }
    });
    
    let consumer = thread::spawn(move || {
        let mut count = 0;
        while let Ok(_) = rx.recv() {
            count += 1;
            if count == NUM_MESSAGES {
                break;
            }
        }
        count
    });
    
    producer.join().unwrap();
    let received = consumer.join().unwrap();
    
    println!("Unbounded channel: {} messages in {:?}", 
             received, start.elapsed());
    
    // Bounded channel benchmark
    let start = Instant::now();
    let (tx, rx) = mpsc::sync_channel(1000);
    
    let producer = thread::spawn(move || {
        for i in 0..NUM_MESSAGES {
            tx.send(i).unwrap();
        }
    });
    
    let consumer = thread::spawn(move || {
        let mut count = 0;
        while let Ok(_) = rx.recv() {
            count += 1;
            if count == NUM_MESSAGES {
                break;
            }
        }
        count
    });
    
    producer.join().unwrap();
    let received = consumer.join().unwrap();
    
    println!("Bounded channel: {} messages in {:?}", 
             received, start.elapsed());
}

fn main() {
    benchmark_channel_performance();
}

Best Practices

1. Choose the Right Channel Type

2. Handle Disconnections Gracefully

3. Consider Performance

Summary

Message passing provides safe concurrency in Rust:

Message passing is ideal for loose coupling between concurrent components and avoiding shared state complexity.


← PreviousNext →