Rust - Async Programming

Overview

Estimated time: 50–60 minutes

Master asynchronous programming in Rust using async/await syntax, futures, and the tokio runtime. Learn how to write concurrent applications that can handle many operations efficiently without blocking threads.

Learning Objectives

Prerequisites

Introduction to Async Programming

Async programming allows you to write code that can pause execution while waiting for operations like network requests or file I/O, enabling other tasks to run. This is more efficient than creating many threads.

Setting Up Tokio

First, add tokio to your Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }

Basic Async Functions

use tokio::time::{sleep, Duration};

// Async function that simulates work
async fn fetch_data(id: u32) -> String {
    println!("Fetching data for ID: {}", id);
    
    // Simulate network delay
    sleep(Duration::from_millis(100)).await;
    
    format!("Data for ID: {}", id)
}

// Async function that uses other async functions
async fn process_data(id: u32) -> String {
    let data = fetch_data(id).await;  // .await to wait for completion
    format!("Processed: {}", data)
}

#[tokio::main]  // This macro sets up the async runtime
async fn main() {
    let result = process_data(42).await;
    println!("{}", result);
}

Expected Output:

Fetching data for ID: 42
Processed: Data for ID: 42

Running Multiple Async Tasks Concurrently

use tokio::time::{sleep, Duration};

async fn download_file(name: &str, delay_ms: u64) -> String {
    println!("Starting download: {}", name);
    sleep(Duration::from_millis(delay_ms)).await;
    println!("Finished download: {}", name);
    format!("Content of {}", name)
}

#[tokio::main]
async fn main() {
    println!("Sequential execution:");
    let start = std::time::Instant::now();
    
    let file1 = download_file("file1.txt", 200).await;
    let file2 = download_file("file2.txt", 300).await;
    let file3 = download_file("file3.txt", 100).await;
    
    println!("Sequential took: {:?}", start.elapsed());
    println!();
    
    println!("Concurrent execution:");
    let start = std::time::Instant::now();
    
    // Run all downloads concurrently
    let (file1, file2, file3) = tokio::join!(
        download_file("file1.txt", 200),
        download_file("file2.txt", 300),
        download_file("file3.txt", 100)
    );
    
    println!("Concurrent took: {:?}", start.elapsed());
    println!("Results: {}, {}, {}", file1, file2, file3);
}

Expected Output:

Sequential execution:
Starting download: file1.txt
Finished download: file1.txt
Starting download: file2.txt
Finished download: file2.txt
Starting download: file3.txt
Finished download: file3.txt
Sequential took: 600ms

Concurrent execution:
Starting download: file1.txt
Starting download: file2.txt
Starting download: file3.txt
Finished download: file3.txt
Finished download: file1.txt  
Finished download: file2.txt
Concurrent took: 300ms
Results: Content of file1.txt, Content of file2.txt, Content of file3.txt

Error Handling in Async Functions

use tokio::time::{sleep, Duration};

#[derive(Debug)]
enum DownloadError {
    NetworkError,
    TimeoutError,
    NotFound,
}

impl std::fmt::Display for DownloadError {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            DownloadError::NetworkError => write!(f, "Network error"),
            DownloadError::TimeoutError => write!(f, "Timeout error"),
            DownloadError::NotFound => write!(f, "File not found"),
        }
    }
}

impl std::error::Error for DownloadError {}

async fn risky_download(id: u32) -> Result<String, DownloadError> {
    sleep(Duration::from_millis(100)).await;
    
    match id {
        1 => Ok("Success data".to_string()),
        2 => Err(DownloadError::NetworkError),
        3 => Err(DownloadError::TimeoutError),
        _ => Err(DownloadError::NotFound),
    }
}

async fn download_with_retry(id: u32, max_retries: u32) -> Result<String, DownloadError> {
    for attempt in 1..=max_retries {
        println!("Attempt {} for ID {}", attempt, id);
        
        match risky_download(id).await {
            Ok(data) => return Ok(data),
            Err(DownloadError::NetworkError) if attempt < max_retries => {
                println!("Retrying due to network error...");
                sleep(Duration::from_millis(50)).await;
                continue;
            }
            Err(e) => return Err(e),
        }
    }
    
    Err(DownloadError::NetworkError)
}

#[tokio::main]
async fn main() {
    // Test different scenarios
    let test_cases = vec![1, 2, 3, 4];
    
    for id in test_cases {
        match download_with_retry(id, 3).await {
            Ok(data) => println!("Success for ID {}: {}", id, data),
            Err(e) => println!("Failed for ID {}: {}", id, e),
        }
        println!();
    }
}

Expected Output:

Attempt 1 for ID 1
Success for ID 1: Success data

Attempt 1 for ID 2
Retrying due to network error...
Attempt 2 for ID 2
Retrying due to network error...
Attempt 3 for ID 2
Failed for ID 2: Network error

Attempt 1 for ID 3
Failed for ID 3: Timeout error

Attempt 1 for ID 4
Failed for ID 4: File not found

Async Blocks and Closures

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Async block
    let future = async {
        println!("Starting async block");
        sleep(Duration::from_millis(100)).await;
        println!("Async block completed");
        42
    };
    
    let result = future.await;
    println!("Async block result: {}", result);
    
    // Async closures (using move to capture ownership)
    let data = vec![1, 2, 3, 4, 5];
    
    let futures: Vec<_> = data.into_iter().map(|x| async move {
        sleep(Duration::from_millis(x * 10)).await;
        x * x
    }).collect();
    
    // Wait for all futures to complete
    let results = futures_util::future::join_all(futures).await;
    println!("Squared results: {:?}", results);
}

Note: You'll need to add futures-util = "0.3" to your Cargo.toml for join_all.

Async Channels for Communication

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[derive(Debug)]
struct Task {
    id: u32,
    data: String,
}

async fn producer(tx: mpsc::Sender<Task>) {
    for i in 1..=5 {
        let task = Task {
            id: i,
            data: format!("Task data {}", i),
        };
        
        println!("Producing: {:?}", task);
        
        if tx.send(task).await.is_err() {
            println!("Receiver dropped");
            break;
        }
        
        sleep(Duration::from_millis(100)).await;
    }
    
    println!("Producer finished");
}

async fn consumer(mut rx: mpsc::Receiver<Task>) {
    while let Some(task) = rx.recv().await {
        println!("Processing: {:?}", task);
        
        // Simulate processing time
        sleep(Duration::from_millis(200)).await;
        
        println!("Completed: {}", task.id);
    }
    
    println!("Consumer finished");
}

#[tokio::main]
async fn main() {
    // Create a channel with buffer size of 3
    let (tx, rx) = mpsc::channel::<Task>(3);
    
    // Spawn producer and consumer concurrently
    let producer_handle = tokio::spawn(producer(tx));
    let consumer_handle = tokio::spawn(consumer(rx));
    
    // Wait for both to complete
    let _ = tokio::join!(producer_handle, consumer_handle);
}

Expected Output:

Producing: Task { id: 1, data: "Task data 1" }
Processing: Task { id: 1, data: "Task data 1" }
Producing: Task { id: 2, data: "Task data 2" }
Producing: Task { id: 3, data: "Task data 3" }
Completed: 1
Processing: Task { id: 2, data: "Task data 2" }
Producing: Task { id: 4, data: "Task data 4" }
Producing: Task { id: 5, data: "Task data 5" }
Producer finished
Completed: 2
Processing: Task { id: 3, data: "Task data 3" }
Completed: 3
Processing: Task { id: 4, data: "Task data 4" }
Completed: 4
Processing: Task { id: 5, data: "Task data 5" }
Completed: 5
Consumer finished

HTTP Client Example

Add these dependencies to your Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
use reqwest;
use serde::Deserialize;
use tokio::time::{timeout, Duration};

#[derive(Deserialize, Debug)]
struct Post {
    id: u32,
    title: String,
    body: String,
}

async fn fetch_post(id: u32) -> Result<Post, Box<dyn std::error::Error>> {
    let url = format!("https://jsonplaceholder.typicode.com/posts/{}", id);
    
    // Add timeout to prevent hanging
    let response = timeout(
        Duration::from_secs(5),
        reqwest::get(&url)
    ).await??;
    
    let post: Post = response.json().await?;
    Ok(post)
}

async fn fetch_multiple_posts(ids: Vec<u32>) -> Vec<Result<Post, Box<dyn std::error::Error>>> {
    let futures = ids.into_iter().map(fetch_post);
    futures_util::future::join_all(futures).await
}

#[tokio::main]
async fn main() {
    println!("Fetching posts concurrently...");
    
    let post_ids = vec![1, 2, 3, 4, 5];
    let results = fetch_multiple_posts(post_ids).await;
    
    for (i, result) in results.into_iter().enumerate() {
        match result {
            Ok(post) => {
                println!("Post {}: {}", i + 1, post.title);
                println!("  Body: {}...", &post.body[..50.min(post.body.len())]);
            }
            Err(e) => println!("Error fetching post {}: {}", i + 1, e),
        }
    }
}

Task Spawning and Cancellation

use tokio::time::{sleep, Duration, timeout};
use tokio::sync::broadcast;

async fn long_running_task(id: u32, mut shutdown: broadcast::Receiver<()>) {
    println!("Task {} started", id);
    
    for i in 1..=10 {
        tokio::select! {
            _ = sleep(Duration::from_millis(500)) => {
                println!("Task {} progress: {}/10", id, i);
            }
            _ = shutdown.recv() => {
                println!("Task {} cancelled at step {}", id, i);
                return;
            }
        }
    }
    
    println!("Task {} completed", id);
}

#[tokio::main]
async fn main() {
    let (shutdown_tx, _) = broadcast::channel(1);
    
    // Spawn multiple tasks
    let mut handles = Vec::new();
    
    for i in 1..=3 {
        let shutdown_rx = shutdown_tx.subscribe();
        let handle = tokio::spawn(long_running_task(i, shutdown_rx));
        handles.push(handle);
    }
    
    // Let tasks run for a while
    sleep(Duration::from_secs(2)).await;
    
    // Cancel all tasks
    println!("Sending shutdown signal...");
    let _ = shutdown_tx.send(());
    
    // Wait for all tasks to finish
    for handle in handles {
        let _ = handle.await;
    }
    
    println!("All tasks finished");
}

Expected Output:

Task 1 started
Task 2 started  
Task 3 started
Task 1 progress: 1/10
Task 2 progress: 1/10
Task 3 progress: 1/10
Task 1 progress: 2/10
Task 2 progress: 2/10
Task 3 progress: 2/10
Task 1 progress: 3/10
Task 2 progress: 3/10
Task 3 progress: 3/10
Task 1 progress: 4/10
Task 2 progress: 4/10
Task 3 progress: 4/10
Sending shutdown signal...
Task 1 cancelled at step 5
Task 2 cancelled at step 5
Task 3 cancelled at step 5
All tasks finished

Async vs Threading Comparison

use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::time::{sleep, Duration, Instant};

// Async version
async fn async_worker(id: u32, counter: Arc<AtomicU32>) {
    for _ in 0..1000 {
        sleep(Duration::from_micros(1)).await;  // Simulate async I/O
        counter.fetch_add(1, Ordering::Relaxed);
    }
    println!("Async worker {} finished", id);
}

// Threading version
fn thread_worker(id: u32, counter: Arc<AtomicU32>) {
    for _ in 0..1000 {
        std::thread::sleep(Duration::from_micros(1));  // Simulate blocking I/O
        counter.fetch_add(1, Ordering::Relaxed);
    }
    println!("Thread worker {} finished", id);
}

#[tokio::main]
async fn main() {
    // Test async approach
    println!("Testing async approach...");
    let start = Instant::now();
    let async_counter = Arc::new(AtomicU32::new(0));
    
    let mut async_handles = Vec::new();
    for i in 0..10 {
        let counter = Arc::clone(&async_counter);
        let handle = tokio::spawn(async_worker(i, counter));
        async_handles.push(handle);
    }
    
    for handle in async_handles {
        handle.await.unwrap();
    }
    
    println!("Async: {} operations in {:?}", 
             async_counter.load(Ordering::Relaxed), 
             start.elapsed());
    println!();
    
    // Test threading approach
    println!("Testing threading approach...");
    let start = Instant::now();
    let thread_counter = Arc::new(AtomicU32::new(0));
    
    let mut thread_handles = Vec::new();
    for i in 0..10 {
        let counter = Arc::clone(&thread_counter);
        let handle = std::thread::spawn(move || thread_worker(i, counter));
        thread_handles.push(handle);
    }
    
    for handle in thread_handles {
        handle.join().unwrap();
    }
    
    println!("Threading: {} operations in {:?}", 
             thread_counter.load(Ordering::Relaxed), 
             start.elapsed());
}

Common Patterns and Best Practices

1. Limiting Concurrent Operations

use tokio::sync::Semaphore;
use std::sync::Arc;

async fn limited_concurrent_operations() {
    // Allow only 3 concurrent operations
    let semaphore = Arc::new(Semaphore::new(3));
    
    let mut handles = Vec::new();
    
    for i in 0..10 {
        let permit = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {
            let _guard = permit.acquire().await.unwrap();
            println!("Operation {} started", i);
            sleep(Duration::from_millis(1000)).await;
            println!("Operation {} finished", i);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
}

Summary

Async programming in Rust provides efficient concurrency:

Choose async programming for I/O-bound operations where you need to handle many concurrent requests efficiently. Use threading for CPU-bound parallel computation.


← PreviousNext →