Rust - Message Passing
Overview
Estimated time: 40–50 minutes
Master message passing concurrency in Rust using channels for safe communication between threads. Learn about single-producer multiple-consumer channels, bounded and unbounded channels, and the crossbeam library for advanced patterns.
Learning Objectives
- Understand message passing as an alternative to shared state concurrency.
- Master std::sync::mpsc channels for thread communication.
- Learn bounded vs unbounded channels and their trade-offs.
- Use crossbeam channels for more advanced patterns.
- Handle errors and disconnections in channel communication.
- Design concurrent systems using the actor pattern.
Prerequisites
Message Passing Philosophy
Rust follows the motto: "Do not communicate by sharing memory; instead, share memory by communicating." Message passing provides a safer alternative to shared state by sending data between threads through channels.
Basic Channels (mpsc)
The standard library provides Multiple Producer, Single Consumer (mpsc) channels:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create a channel
let (tx, rx) = mpsc::channel();
// Spawn a thread to send data
thread::spawn(move || {
let messages = vec![
"Hello",
"from",
"the",
"spawned",
"thread",
];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// Receive messages in the main thread
for received in rx {
println!("Received: {}", received);
}
println!("All messages received");
}
Expected Output:
Received: Hello
Received: from
Received: the
Received: spawned
Received: thread
All messages received
Multiple Producers
Multiple threads can send to the same receiver by cloning the transmitter:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Create multiple producers
for thread_id in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
for i in 0..3 {
let message = format!("Thread {} says: Message {}", thread_id, i);
tx_clone.send(message).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
}
// Drop the original transmitter so the receiver knows when all senders are gone
drop(tx);
// Receive all messages
for received in rx {
println!("{}", received);
}
}
Expected Output:
Thread 0 says: Message 0
Thread 1 says: Message 0
Thread 2 says: Message 0
Thread 0 says: Message 1
Thread 1 says: Message 1
Thread 2 says: Message 1
Thread 0 says: Message 2
Thread 1 says: Message 2
Thread 2 says: Message 2
Non-blocking Operations
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send("Hello after delay").unwrap();
});
// Non-blocking receive
loop {
match rx.try_recv() {
Ok(msg) => {
println!("Received: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("No message yet, doing other work...");
thread::sleep(Duration::from_millis(100));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Sender disconnected");
break;
}
}
}
}
Expected Output:
No message yet, doing other work...
No message yet, doing other work...
No message yet, doing other work...
No message yet, doing other work...
No message yet, doing other work...
Received: Hello after delay
Synchronous Channels (Bounded)
Synchronous channels block the sender when the buffer is full:
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
fn main() {
// Create a synchronous channel with buffer size 2
let (tx, rx) = mpsc::sync_channel(2);
let start = Instant::now();
// Producer thread
let producer = thread::spawn(move || {
for i in 0..5 {
println!("Sending {} at {:?}", i, start.elapsed());
tx.send(i).unwrap();
println!("Sent {} at {:?}", i, start.elapsed());
}
});
// Consumer thread (slower than producer)
let consumer = thread::spawn(move || {
for _ in 0..5 {
thread::sleep(Duration::from_millis(300));
let received = rx.recv().unwrap();
println!("Received {} at {:?}", received, start.elapsed());
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
Expected Output:
Sending 0 at 1ms
Sent 0 at 2ms
Sending 1 at 2ms
Sent 1 at 2ms
Sending 2 at 2ms
Sent 2 at 3ms
Sending 3 at 3ms
Received 0 at 301ms
Sent 3 at 301ms
Sending 4 at 301ms
Received 1 at 602ms
Sent 4 at 602ms
Received 2 at 903ms
Received 3 at 1204ms
Received 4 at 1505ms
Error Handling and Disconnection
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
#[derive(Debug)]
enum WorkerMessage {
Task(String),
Shutdown,
}
fn worker(id: u32, rx: mpsc::Receiver<WorkerMessage>) {
println!("Worker {} started", id);
loop {
match rx.recv() {
Ok(WorkerMessage::Task(task)) => {
println!("Worker {} processing: {}", id, task);
thread::sleep(Duration::from_millis(100));
println!("Worker {} completed: {}", id, task);
}
Ok(WorkerMessage::Shutdown) => {
println!("Worker {} shutting down", id);
break;
}
Err(mpsc::RecvError) => {
println!("Worker {} detected sender disconnection", id);
break;
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
// Spawn worker thread
let worker_handle = thread::spawn(move || {
worker(1, rx);
});
// Send some tasks
for i in 1..=3 {
tx.send(WorkerMessage::Task(format!("Task {}", i))).unwrap();
}
// Send shutdown signal
tx.send(WorkerMessage::Shutdown).unwrap();
// Wait for worker to finish
worker_handle.join().unwrap();
// Demonstrate error when trying to send after receiver is dropped
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
match tx.send(WorkerMessage::Task("Late task".to_string())) {
Ok(_) => println!("Sent late task"),
Err(e) => println!("Failed to send late task: {}", e),
}
}).join().unwrap();
}
Expected Output:
Worker 1 started
Worker 1 processing: Task 1
Worker 1 completed: Task 1
Worker 1 processing: Task 2
Worker 1 completed: Task 2
Worker 1 processing: Task 3
Worker 1 completed: Task 3
Worker 1 shutting down
Failed to send late task: sending on a closed channel
Crossbeam Channels
The crossbeam library provides more advanced channel types. First, add to Cargo.toml
:
[dependencies]
crossbeam = "0.8"
Unbounded Channels
use crossbeam::channel;
use std::thread;
use std::time::Duration;
fn main() {
// Unbounded channel
let (tx, rx) = channel::unbounded();
// Multiple senders
for i in 0..3 {
let tx = tx.clone();
thread::spawn(move || {
for j in 0..3 {
tx.send(format!("Message {}-{}", i, j)).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
}
// Drop original sender
drop(tx);
// Receive all messages
while let Ok(msg) = rx.recv() {
println!("Received: {}", msg);
}
}
Bounded Channels with Selection
use crossbeam::channel;
use crossbeam::select;
use std::thread;
use std::time::Duration;
fn main() {
let (tx1, rx1) = channel::bounded(1);
let (tx2, rx2) = channel::bounded(1);
// Producer 1
thread::spawn(move || {
for i in 0..3 {
thread::sleep(Duration::from_millis(100));
tx1.send(format!("Source 1: {}", i)).unwrap();
}
});
// Producer 2
thread::spawn(move || {
for i in 0..3 {
thread::sleep(Duration::from_millis(150));
tx2.send(format!("Source 2: {}", i)).unwrap();
}
});
// Consumer using select to receive from either channel
let mut count = 0;
loop {
select! {
recv(rx1) -> msg => {
match msg {
Ok(msg) => {
println!("From channel 1: {}", msg);
count += 1;
}
Err(_) => println!("Channel 1 closed"),
}
}
recv(rx2) -> msg => {
match msg {
Ok(msg) => {
println!("From channel 2: {}", msg);
count += 1;
}
Err(_) => println!("Channel 2 closed"),
}
}
}
if count >= 6 {
break;
}
}
}
Expected Output:
From channel 1: Source 1: 0
From channel 2: Source 2: 0
From channel 1: Source 1: 1
From channel 2: Source 2: 1
From channel 1: Source 1: 2
From channel 2: Source 2: 2
Actor Pattern Implementation
use std::sync::mpsc;
use std::thread;
use std::collections::HashMap;
#[derive(Debug)]
enum BankMessage {
Deposit { account: String, amount: u32 },
Withdraw { account: String, amount: u32 },
Balance { account: String, response: mpsc::Sender<u32> },
Shutdown,
}
struct BankActor {
accounts: HashMap<String, u32>,
receiver: mpsc::Receiver<BankMessage>,
}
impl BankActor {
fn new(receiver: mpsc::Receiver<BankMessage>) -> Self {
BankActor {
accounts: HashMap::new(),
receiver,
}
}
fn run(mut self) {
println!("Bank actor started");
while let Ok(message) = self.receiver.recv() {
match message {
BankMessage::Deposit { account, amount } => {
let balance = self.accounts.entry(account.clone()).or_insert(0);
*balance += amount;
println!("Deposited {} to {}. New balance: {}", amount, account, balance);
}
BankMessage::Withdraw { account, amount } => {
let balance = self.accounts.entry(account.clone()).or_insert(0);
if *balance >= amount {
*balance -= amount;
println!("Withdrew {} from {}. New balance: {}", amount, account, balance);
} else {
println!("Insufficient funds in {}. Balance: {}", account, balance);
}
}
BankMessage::Balance { account, response } => {
let balance = *self.accounts.get(&account).unwrap_or(&0);
println!("Balance inquiry for {}: {}", account, balance);
let _ = response.send(balance);
}
BankMessage::Shutdown => {
println!("Bank actor shutting down");
break;
}
}
}
}
}
// Client handle for interacting with the bank actor
struct BankClient {
sender: mpsc::Sender<BankMessage>,
}
impl BankClient {
fn new(sender: mpsc::Sender<BankMessage>) -> Self {
BankClient { sender }
}
fn deposit(&self, account: &str, amount: u32) {
self.sender.send(BankMessage::Deposit {
account: account.to_string(),
amount,
}).unwrap();
}
fn withdraw(&self, account: &str, amount: u32) {
self.sender.send(BankMessage::Withdraw {
account: account.to_string(),
amount,
}).unwrap();
}
fn balance(&self, account: &str) -> u32 {
let (response_tx, response_rx) = mpsc::channel();
self.sender.send(BankMessage::Balance {
account: account.to_string(),
response: response_tx,
}).unwrap();
response_rx.recv().unwrap()
}
fn shutdown(&self) {
self.sender.send(BankMessage::Shutdown).unwrap();
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let bank_client = BankClient::new(tx);
// Start the bank actor
let bank_handle = thread::spawn(move || {
let bank = BankActor::new(rx);
bank.run();
});
// Use the bank
bank_client.deposit("Alice", 1000);
bank_client.deposit("Bob", 500);
bank_client.withdraw("Alice", 200);
bank_client.withdraw("Bob", 600); // Should fail
println!("Alice balance: {}", bank_client.balance("Alice"));
println!("Bob balance: {}", bank_client.balance("Bob"));
bank_client.shutdown();
bank_handle.join().unwrap();
}
Expected Output:
Bank actor started
Deposited 1000 to Alice. New balance: 1000
Deposited 500 to Bob. New balance: 500
Withdrew 200 from Alice. New balance: 800
Insufficient funds in Bob. Balance: 500
Balance inquiry for Alice: 800
Balance inquiry for Bob: 500
Alice balance: 800
Bob balance: 500
Bank actor shutting down
Performance Considerations
use std::sync::mpsc;
use std::thread;
use std::time::Instant;
fn benchmark_channel_performance() {
const NUM_MESSAGES: usize = 1_000_000;
// Unbounded channel benchmark
let start = Instant::now();
let (tx, rx) = mpsc::channel();
let producer = thread::spawn(move || {
for i in 0..NUM_MESSAGES {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
if count == NUM_MESSAGES {
break;
}
}
count
});
producer.join().unwrap();
let received = consumer.join().unwrap();
println!("Unbounded channel: {} messages in {:?}",
received, start.elapsed());
// Bounded channel benchmark
let start = Instant::now();
let (tx, rx) = mpsc::sync_channel(1000);
let producer = thread::spawn(move || {
for i in 0..NUM_MESSAGES {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
if count == NUM_MESSAGES {
break;
}
}
count
});
producer.join().unwrap();
let received = consumer.join().unwrap();
println!("Bounded channel: {} messages in {:?}",
received, start.elapsed());
}
fn main() {
benchmark_channel_performance();
}
Best Practices
1. Choose the Right Channel Type
- Unbounded: Use when producer and consumer speeds are roughly matched
- Bounded: Use for backpressure and memory control
- Synchronous (bound=0): Use for direct handoff between threads
2. Handle Disconnections Gracefully
- Always handle
RecvError
andSendError
- Use timeouts with
recv_timeout
when appropriate - Design shutdown protocols with explicit shutdown messages
3. Consider Performance
- Batch messages when possible to reduce channel overhead
- Use
try_recv
for non-blocking operations - Consider crossbeam channels for advanced patterns
Summary
Message passing provides safe concurrency in Rust:
- Channels: Safe communication primitives for sending data between threads
- mpsc: Multiple producer, single consumer channels in the standard library
- Bounded vs unbounded: Trade-offs between memory usage and blocking behavior
- Error handling: Graceful handling of disconnections and timeouts
- Actor pattern: Encapsulate state in actors that communicate via messages
- Crossbeam: Advanced channel types with selection and multiple consumers
Message passing is ideal for loose coupling between concurrent components and avoiding shared state complexity.