Rust - Async Programming
Overview
Estimated time: 50–60 minutes
Master asynchronous programming in Rust using async/await syntax, futures, and the tokio runtime. Learn how to write concurrent applications that can handle many operations efficiently without blocking threads.
Learning Objectives
- Understand async/await syntax and futures in Rust.
- Learn how to set up and use the tokio async runtime.
- Master async functions, blocks, and closures.
- Handle async error handling and cancellation.
- Write concurrent applications with async streams and channels.
- Compare async programming with traditional threading.
Prerequisites
Introduction to Async Programming
Async programming allows you to write code that can pause execution while waiting for operations like network requests or file I/O, enabling other tasks to run. This is more efficient than creating many threads.
Setting Up Tokio
First, add tokio to your Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
Basic Async Functions
use tokio::time::{sleep, Duration};
// Async function that simulates work
async fn fetch_data(id: u32) -> String {
println!("Fetching data for ID: {}", id);
// Simulate network delay
sleep(Duration::from_millis(100)).await;
format!("Data for ID: {}", id)
}
// Async function that uses other async functions
async fn process_data(id: u32) -> String {
let data = fetch_data(id).await; // .await to wait for completion
format!("Processed: {}", data)
}
#[tokio::main] // This macro sets up the async runtime
async fn main() {
let result = process_data(42).await;
println!("{}", result);
}
Expected Output:
Fetching data for ID: 42
Processed: Data for ID: 42
Running Multiple Async Tasks Concurrently
use tokio::time::{sleep, Duration};
async fn download_file(name: &str, delay_ms: u64) -> String {
println!("Starting download: {}", name);
sleep(Duration::from_millis(delay_ms)).await;
println!("Finished download: {}", name);
format!("Content of {}", name)
}
#[tokio::main]
async fn main() {
println!("Sequential execution:");
let start = std::time::Instant::now();
let file1 = download_file("file1.txt", 200).await;
let file2 = download_file("file2.txt", 300).await;
let file3 = download_file("file3.txt", 100).await;
println!("Sequential took: {:?}", start.elapsed());
println!();
println!("Concurrent execution:");
let start = std::time::Instant::now();
// Run all downloads concurrently
let (file1, file2, file3) = tokio::join!(
download_file("file1.txt", 200),
download_file("file2.txt", 300),
download_file("file3.txt", 100)
);
println!("Concurrent took: {:?}", start.elapsed());
println!("Results: {}, {}, {}", file1, file2, file3);
}
Expected Output:
Sequential execution:
Starting download: file1.txt
Finished download: file1.txt
Starting download: file2.txt
Finished download: file2.txt
Starting download: file3.txt
Finished download: file3.txt
Sequential took: 600ms
Concurrent execution:
Starting download: file1.txt
Starting download: file2.txt
Starting download: file3.txt
Finished download: file3.txt
Finished download: file1.txt
Finished download: file2.txt
Concurrent took: 300ms
Results: Content of file1.txt, Content of file2.txt, Content of file3.txt
Error Handling in Async Functions
use tokio::time::{sleep, Duration};
#[derive(Debug)]
enum DownloadError {
NetworkError,
TimeoutError,
NotFound,
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
DownloadError::NetworkError => write!(f, "Network error"),
DownloadError::TimeoutError => write!(f, "Timeout error"),
DownloadError::NotFound => write!(f, "File not found"),
}
}
}
impl std::error::Error for DownloadError {}
async fn risky_download(id: u32) -> Result<String, DownloadError> {
sleep(Duration::from_millis(100)).await;
match id {
1 => Ok("Success data".to_string()),
2 => Err(DownloadError::NetworkError),
3 => Err(DownloadError::TimeoutError),
_ => Err(DownloadError::NotFound),
}
}
async fn download_with_retry(id: u32, max_retries: u32) -> Result<String, DownloadError> {
for attempt in 1..=max_retries {
println!("Attempt {} for ID {}", attempt, id);
match risky_download(id).await {
Ok(data) => return Ok(data),
Err(DownloadError::NetworkError) if attempt < max_retries => {
println!("Retrying due to network error...");
sleep(Duration::from_millis(50)).await;
continue;
}
Err(e) => return Err(e),
}
}
Err(DownloadError::NetworkError)
}
#[tokio::main]
async fn main() {
// Test different scenarios
let test_cases = vec![1, 2, 3, 4];
for id in test_cases {
match download_with_retry(id, 3).await {
Ok(data) => println!("Success for ID {}: {}", id, data),
Err(e) => println!("Failed for ID {}: {}", id, e),
}
println!();
}
}
Expected Output:
Attempt 1 for ID 1
Success for ID 1: Success data
Attempt 1 for ID 2
Retrying due to network error...
Attempt 2 for ID 2
Retrying due to network error...
Attempt 3 for ID 2
Failed for ID 2: Network error
Attempt 1 for ID 3
Failed for ID 3: Timeout error
Attempt 1 for ID 4
Failed for ID 4: File not found
Async Blocks and Closures
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// Async block
let future = async {
println!("Starting async block");
sleep(Duration::from_millis(100)).await;
println!("Async block completed");
42
};
let result = future.await;
println!("Async block result: {}", result);
// Async closures (using move to capture ownership)
let data = vec![1, 2, 3, 4, 5];
let futures: Vec<_> = data.into_iter().map(|x| async move {
sleep(Duration::from_millis(x * 10)).await;
x * x
}).collect();
// Wait for all futures to complete
let results = futures_util::future::join_all(futures).await;
println!("Squared results: {:?}", results);
}
Note: You'll need to add futures-util = "0.3"
to your Cargo.toml
for join_all
.
Async Channels for Communication
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[derive(Debug)]
struct Task {
id: u32,
data: String,
}
async fn producer(tx: mpsc::Sender<Task>) {
for i in 1..=5 {
let task = Task {
id: i,
data: format!("Task data {}", i),
};
println!("Producing: {:?}", task);
if tx.send(task).await.is_err() {
println!("Receiver dropped");
break;
}
sleep(Duration::from_millis(100)).await;
}
println!("Producer finished");
}
async fn consumer(mut rx: mpsc::Receiver<Task>) {
while let Some(task) = rx.recv().await {
println!("Processing: {:?}", task);
// Simulate processing time
sleep(Duration::from_millis(200)).await;
println!("Completed: {}", task.id);
}
println!("Consumer finished");
}
#[tokio::main]
async fn main() {
// Create a channel with buffer size of 3
let (tx, rx) = mpsc::channel::<Task>(3);
// Spawn producer and consumer concurrently
let producer_handle = tokio::spawn(producer(tx));
let consumer_handle = tokio::spawn(consumer(rx));
// Wait for both to complete
let _ = tokio::join!(producer_handle, consumer_handle);
}
Expected Output:
Producing: Task { id: 1, data: "Task data 1" }
Processing: Task { id: 1, data: "Task data 1" }
Producing: Task { id: 2, data: "Task data 2" }
Producing: Task { id: 3, data: "Task data 3" }
Completed: 1
Processing: Task { id: 2, data: "Task data 2" }
Producing: Task { id: 4, data: "Task data 4" }
Producing: Task { id: 5, data: "Task data 5" }
Producer finished
Completed: 2
Processing: Task { id: 3, data: "Task data 3" }
Completed: 3
Processing: Task { id: 4, data: "Task data 4" }
Completed: 4
Processing: Task { id: 5, data: "Task data 5" }
Completed: 5
Consumer finished
HTTP Client Example
Add these dependencies to your Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
use reqwest;
use serde::Deserialize;
use tokio::time::{timeout, Duration};
#[derive(Deserialize, Debug)]
struct Post {
id: u32,
title: String,
body: String,
}
async fn fetch_post(id: u32) -> Result<Post, Box<dyn std::error::Error>> {
let url = format!("https://jsonplaceholder.typicode.com/posts/{}", id);
// Add timeout to prevent hanging
let response = timeout(
Duration::from_secs(5),
reqwest::get(&url)
).await??;
let post: Post = response.json().await?;
Ok(post)
}
async fn fetch_multiple_posts(ids: Vec<u32>) -> Vec<Result<Post, Box<dyn std::error::Error>>> {
let futures = ids.into_iter().map(fetch_post);
futures_util::future::join_all(futures).await
}
#[tokio::main]
async fn main() {
println!("Fetching posts concurrently...");
let post_ids = vec![1, 2, 3, 4, 5];
let results = fetch_multiple_posts(post_ids).await;
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(post) => {
println!("Post {}: {}", i + 1, post.title);
println!(" Body: {}...", &post.body[..50.min(post.body.len())]);
}
Err(e) => println!("Error fetching post {}: {}", i + 1, e),
}
}
}
Task Spawning and Cancellation
use tokio::time::{sleep, Duration, timeout};
use tokio::sync::broadcast;
async fn long_running_task(id: u32, mut shutdown: broadcast::Receiver<()>) {
println!("Task {} started", id);
for i in 1..=10 {
tokio::select! {
_ = sleep(Duration::from_millis(500)) => {
println!("Task {} progress: {}/10", id, i);
}
_ = shutdown.recv() => {
println!("Task {} cancelled at step {}", id, i);
return;
}
}
}
println!("Task {} completed", id);
}
#[tokio::main]
async fn main() {
let (shutdown_tx, _) = broadcast::channel(1);
// Spawn multiple tasks
let mut handles = Vec::new();
for i in 1..=3 {
let shutdown_rx = shutdown_tx.subscribe();
let handle = tokio::spawn(long_running_task(i, shutdown_rx));
handles.push(handle);
}
// Let tasks run for a while
sleep(Duration::from_secs(2)).await;
// Cancel all tasks
println!("Sending shutdown signal...");
let _ = shutdown_tx.send(());
// Wait for all tasks to finish
for handle in handles {
let _ = handle.await;
}
println!("All tasks finished");
}
Expected Output:
Task 1 started
Task 2 started
Task 3 started
Task 1 progress: 1/10
Task 2 progress: 1/10
Task 3 progress: 1/10
Task 1 progress: 2/10
Task 2 progress: 2/10
Task 3 progress: 2/10
Task 1 progress: 3/10
Task 2 progress: 3/10
Task 3 progress: 3/10
Task 1 progress: 4/10
Task 2 progress: 4/10
Task 3 progress: 4/10
Sending shutdown signal...
Task 1 cancelled at step 5
Task 2 cancelled at step 5
Task 3 cancelled at step 5
All tasks finished
Async vs Threading Comparison
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::time::{sleep, Duration, Instant};
// Async version
async fn async_worker(id: u32, counter: Arc<AtomicU32>) {
for _ in 0..1000 {
sleep(Duration::from_micros(1)).await; // Simulate async I/O
counter.fetch_add(1, Ordering::Relaxed);
}
println!("Async worker {} finished", id);
}
// Threading version
fn thread_worker(id: u32, counter: Arc<AtomicU32>) {
for _ in 0..1000 {
std::thread::sleep(Duration::from_micros(1)); // Simulate blocking I/O
counter.fetch_add(1, Ordering::Relaxed);
}
println!("Thread worker {} finished", id);
}
#[tokio::main]
async fn main() {
// Test async approach
println!("Testing async approach...");
let start = Instant::now();
let async_counter = Arc::new(AtomicU32::new(0));
let mut async_handles = Vec::new();
for i in 0..10 {
let counter = Arc::clone(&async_counter);
let handle = tokio::spawn(async_worker(i, counter));
async_handles.push(handle);
}
for handle in async_handles {
handle.await.unwrap();
}
println!("Async: {} operations in {:?}",
async_counter.load(Ordering::Relaxed),
start.elapsed());
println!();
// Test threading approach
println!("Testing threading approach...");
let start = Instant::now();
let thread_counter = Arc::new(AtomicU32::new(0));
let mut thread_handles = Vec::new();
for i in 0..10 {
let counter = Arc::clone(&thread_counter);
let handle = std::thread::spawn(move || thread_worker(i, counter));
thread_handles.push(handle);
}
for handle in thread_handles {
handle.join().unwrap();
}
println!("Threading: {} operations in {:?}",
thread_counter.load(Ordering::Relaxed),
start.elapsed());
}
Common Patterns and Best Practices
1. Limiting Concurrent Operations
use tokio::sync::Semaphore;
use std::sync::Arc;
async fn limited_concurrent_operations() {
// Allow only 3 concurrent operations
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = Vec::new();
for i in 0..10 {
let permit = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {
let _guard = permit.acquire().await.unwrap();
println!("Operation {} started", i);
sleep(Duration::from_millis(1000)).await;
println!("Operation {} finished", i);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
Summary
Async programming in Rust provides efficient concurrency:
- async/await: Write asynchronous code that looks synchronous
- Futures: Represent values that will be available in the future
- Tokio runtime: Powers async execution and provides utilities
- Concurrency: Run many operations simultaneously without thread overhead
- Error handling: Use Result types and ? operator as usual
- Channels: Communicate between async tasks safely
- Cancellation: Use select! and broadcast channels for graceful shutdown
Choose async programming for I/O-bound operations where you need to handle many concurrent requests efficiently. Use threading for CPU-bound parallel computation.