Kotlin - Async Programming

Overview

Async programming in Kotlin provides powerful tools for handling concurrent operations efficiently. This tutorial covers async/await patterns, concurrent execution, structured concurrency, and best practices for building responsive applications.

๐ŸŽฏ Learning Objectives:
  • Understand async/await patterns in Kotlin
  • Learn to handle concurrent operations effectively
  • Master structured concurrency principles
  • Apply async programming in real-world scenarios
  • Handle errors and cancellation in async code

Basic Async/Await

The async/await pattern allows you to write asynchronous code that looks and feels like synchronous code.

Simple Async Operations

import kotlinx.coroutines.*

// Simulate async operations
suspend fun fetchUserData(userId: Int): String {
    delay(1000) // Simulate network delay
    return "User data for ID: $userId"
}

suspend fun fetchUserPosts(userId: Int): List {
    delay(800) // Simulate network delay
    return listOf("Post 1", "Post 2", "Post 3")
}

fun main() = runBlocking {
    println("Starting async operations...")
    
    // Sequential execution
    val startTime = System.currentTimeMillis()
    
    val userData = fetchUserData(123)
    val userPosts = fetchUserPosts(123)
    
    val endTime = System.currentTimeMillis()
    
    println("User Data: $userData")
    println("User Posts: $userPosts")
    println("Total time: ${endTime - startTime}ms")
}

Concurrent Execution with Async

import kotlinx.coroutines.*

fun main() = runBlocking {
    println("Starting concurrent operations...")
    
    val startTime = System.currentTimeMillis()
    
    // Launch both operations concurrently
    val userDataDeferred = async { fetchUserData(123) }
    val userPostsDeferred = async { fetchUserPosts(123) }
    
    // Wait for both to complete
    val userData = userDataDeferred.await()
    val userPosts = userPostsDeferred.await()
    
    val endTime = System.currentTimeMillis()
    
    println("User Data: $userData")
    println("User Posts: $userPosts")
    println("Total time: ${endTime - startTime}ms") // Much faster!
}
Key Insight: Using async allows operations to run concurrently, significantly reducing total execution time when operations are independent.

Structured Concurrency

Structured concurrency ensures that async operations are properly managed and cleaned up.

Coroutine Scope Management

import kotlinx.coroutines.*

class UserService {
    // Create a scope for managing coroutines
    private val serviceScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    
    suspend fun getUserProfile(userId: Int): UserProfile = coroutineScope {
        // All async operations are children of this scope
        val basicInfoDeferred = async { fetchBasicInfo(userId) }
        val preferencesDeferred = async { fetchPreferences(userId) }
        val statisticsDeferred = async { fetchStatistics(userId) }
        
        // If any operation fails, all are cancelled
        UserProfile(
            basicInfo = basicInfoDeferred.await(),
            preferences = preferencesDeferred.await(),
            statistics = statisticsDeferred.await()
        )
    }
    
    private suspend fun fetchBasicInfo(userId: Int): BasicInfo {
        delay(500)
        return BasicInfo("User $userId", "[email protected]")
    }
    
    private suspend fun fetchPreferences(userId: Int): Preferences {
        delay(300)
        return Preferences("dark", "en")
    }
    
    private suspend fun fetchStatistics(userId: Int): Statistics {
        delay(400)
        return Statistics(loginCount = 42, postsCount = 15)
    }
    
    fun cleanup() {
        serviceScope.cancel()
    }
}

data class BasicInfo(val name: String, val email: String)
data class Preferences(val theme: String, val language: String)
data class Statistics(val loginCount: Int, val postsCount: Int)
data class UserProfile(
    val basicInfo: BasicInfo,
    val preferences: Preferences,
    val statistics: Statistics
)

fun main() = runBlocking {
    val userService = UserService()
    
    try {
        val profile = userService.getUserProfile(123)
        println("Profile loaded: $profile")
    } finally {
        userService.cleanup()
    }
}

Multiple Async Operations

Processing Collections Concurrently

import kotlinx.coroutines.*

suspend fun processUser(userId: Int): String {
    delay(100) // Simulate processing time
    return "Processed user $userId"
}

fun main() = runBlocking {
    val userIds = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    // Sequential processing
    val sequentialStart = System.currentTimeMillis()
    val sequentialResults = userIds.map { processUser(it) }
    val sequentialTime = System.currentTimeMillis() - sequentialStart
    
    println("Sequential results: $sequentialResults")
    println("Sequential time: ${sequentialTime}ms")
    
    // Concurrent processing
    val concurrentStart = System.currentTimeMillis()
    val concurrentResults = userIds.map { userId ->
        async { processUser(userId) }
    }.map { it.await() }
    val concurrentTime = System.currentTimeMillis() - concurrentStart
    
    println("Concurrent results: $concurrentResults")
    println("Concurrent time: ${concurrentTime}ms")
    
    // Concurrent processing with awaitAll
    val awaitAllStart = System.currentTimeMillis()
    val awaitAllResults = userIds.map { userId ->
        async { processUser(userId) }
    }.awaitAll()
    val awaitAllTime = System.currentTimeMillis() - awaitAllStart
    
    println("AwaitAll results: $awaitAllResults")
    println("AwaitAll time: ${awaitAllTime}ms")
}

Limiting Concurrency

import kotlinx.coroutines.*
import java.util.concurrent.Semaphore

class ConcurrencyLimitedService {
    private val semaphore = Semaphore(3) // Allow max 3 concurrent operations
    
    suspend fun processWithLimit(id: Int): String {
        semaphore.acquire()
        try {
            delay(500) // Simulate work
            return "Processed $id"
        } finally {
            semaphore.release()
        }
    }
}

fun main() = runBlocking {
    val service = ConcurrencyLimitedService()
    val items = (1..10).toList()
    
    val results = items.map { id ->
        async { service.processWithLimit(id) }
    }.awaitAll()
    
    println("Results: $results")
}

Error Handling in Async Code

Exception Propagation

import kotlinx.coroutines.*

suspend fun riskyOperation(id: Int): String {
    delay(100)
    if (id == 3) {
        throw RuntimeException("Operation failed for ID: $id")
    }
    return "Success for ID: $id"
}

fun main() = runBlocking {
    // Exception in one async operation cancels siblings
    try {
        val results = (1..5).map { id ->
            async { riskyOperation(id) }
        }.awaitAll()
        
        println("All results: $results")
    } catch (e: Exception) {
        println("Caught exception: ${e.message}")
    }
    
    println("\n--- Using supervisorScope ---")
    
    // Using supervisorScope to handle failures independently
    supervisorScope {
        val results = (1..5).map { id ->
            async {
                try {
                    riskyOperation(id)
                } catch (e: Exception) {
                    "Failed: ${e.message}"
                }
            }
        }.awaitAll()
        
        println("Results with error handling: $results")
    }
}

Try-Catch with Async

import kotlinx.coroutines.*

fun main() = runBlocking {
    // Individual error handling
    val results = (1..5).map { id ->
        async {
            try {
                riskyOperation(id)
            } catch (e: Exception) {
                "Error for $id: ${e.message}"
            }
        }
    }.awaitAll()
    
    println("Results with individual handling: $results")
    
    // Selective error handling
    val safeResults = supervisorScope {
        (1..5).map { id ->
            async {
                runCatching { riskyOperation(id) }
                    .getOrElse { "Failed: ${it.message}" }
            }
        }.awaitAll()
    }
    
    println("Safe results: $safeResults")
}

Async Patterns and Use Cases

Producer-Consumer Pattern

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

class AsyncProducerConsumer {
    private val channel = Channel(capacity = 10)
    
    fun startProducer(scope: CoroutineScope) = scope.launch {
        repeat(20) { value ->
            println("Producing: $value")
            channel.send(value)
            delay(50) // Simulate production time
        }
        channel.close()
    }
    
    fun startConsumer(scope: CoroutineScope, consumerId: Int) = scope.launch {
        for (value in channel) {
            println("Consumer $consumerId processing: $value")
            delay(100) // Simulate processing time
        }
        println("Consumer $consumerId finished")
    }
}

fun main() = runBlocking {
    val producerConsumer = AsyncProducerConsumer()
    
    // Start producer and multiple consumers
    val producer = producerConsumer.startProducer(this)
    val consumer1 = producerConsumer.startConsumer(this, 1)
    val consumer2 = producerConsumer.startConsumer(this, 2)
    
    // Wait for all to complete
    joinAll(producer, consumer1, consumer2)
}

Parallel Processing with Results Aggregation

import kotlinx.coroutines.*

data class ProcessingResult(val id: Int, val result: String, val processingTime: Long)

suspend fun heavyComputation(id: Int): ProcessingResult {
    val start = System.currentTimeMillis()
    delay((100..500).random().toLong()) // Simulate variable processing time
    val end = System.currentTimeMillis()
    return ProcessingResult(id, "Result for $id", end - start)
}

fun main() = runBlocking {
    val itemIds = (1..10).toList()
    
    // Process items in parallel and collect results
    val results = itemIds.map { id ->
        async { heavyComputation(id) }
    }.awaitAll()
    
    // Aggregate results
    val totalTime = results.sumOf { it.processingTime }
    val averageTime = results.map { it.processingTime }.average()
    val fastestResult = results.minByOrNull { it.processingTime }
    val slowestResult = results.maxByOrNull { it.processingTime }
    
    println("All results:")
    results.forEach { println("  $it") }
    
    println("\nAggregated statistics:")
    println("Total processing time: ${totalTime}ms")
    println("Average processing time: ${averageTime.toInt()}ms")
    println("Fastest: ${fastestResult?.result} (${fastestResult?.processingTime}ms)")
    println("Slowest: ${slowestResult?.result} (${slowestResult?.processingTime}ms)")
}

Real-World Examples

Web API Client

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class ApiClient {
    suspend fun fetchUser(userId: Int): User {
        delay(200) // Simulate HTTP request
        return User(userId, "User $userId", "[email protected]")
    }
    
    suspend fun fetchPosts(userId: Int): List {
        delay(300)
        return listOf(
            Post(1, "Post 1", "Content 1"),
            Post(2, "Post 2", "Content 2")
        )
    }
    
    suspend fun fetchComments(postId: Int): List {
        delay(150)
        return listOf(
            Comment(1, "Great post!"),
            Comment(2, "Thanks for sharing")
        )
    }
}

data class User(val id: Int, val name: String, val email: String)
data class Post(val id: Int, val title: String, val content: String)
data class Comment(val id: Int, val text: String)
data class UserWithPosts(val user: User, val posts: List)
data class PostWithComments(val post: Post, val comments: List)

class UserProfileService(private val apiClient: ApiClient) {
    suspend fun getCompleteUserProfile(userId: Int): UserWithPosts = coroutineScope {
        // Fetch user and posts concurrently
        val userDeferred = async { apiClient.fetchUser(userId) }
        val postsDeferred = async { apiClient.fetchPosts(userId) }
        
        val user = userDeferred.await()
        val posts = postsDeferred.await()
        
        // Fetch comments for all posts concurrently
        val postsWithComments = posts.map { post ->
            async {
                val comments = apiClient.fetchComments(post.id)
                PostWithComments(post, comments)
            }
        }.awaitAll()
        
        UserWithPosts(user, postsWithComments)
    }
}

fun main() = runBlocking {
    val apiClient = ApiClient()
    val profileService = UserProfileService(apiClient)
    
    val startTime = System.currentTimeMillis()
    val profile = profileService.getCompleteUserProfile(123)
    val endTime = System.currentTimeMillis()
    
    println("User: ${profile.user}")
    println("Posts with comments:")
    profile.posts.forEach { postWithComments ->
        println("  ${postWithComments.post.title}")
        postWithComments.comments.forEach { comment ->
            println("    - ${comment.text}")
        }
    }
    println("Total time: ${endTime - startTime}ms")
}

Batch Processing System

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class BatchProcessor(
    private val batchSize: Int = 10,
    private val maxConcurrency: Int = 3,
    private val processor: suspend (List) -> List
) {
    suspend fun processAll(items: List): List = coroutineScope {
        val semaphore = kotlinx.coroutines.sync.Semaphore(maxConcurrency)
        
        items.chunked(batchSize)
            .map { batch ->
                async {
                    semaphore.withPermit {
                        processor(batch)
                    }
                }
            }
            .awaitAll()
            .flatten()
    }
}

// Example usage
suspend fun processImageBatch(images: List): List {
    delay(500) // Simulate image processing
    return images.map { "Processed: $it" }
}

fun main() = runBlocking {
    val images = (1..50).map { "image_$it.jpg" }
    
    val processor = BatchProcessor(
        batchSize = 5,
        maxConcurrency = 3,
        processor = ::processImageBatch
    )
    
    val startTime = System.currentTimeMillis()
    val results = processor.processAll(images)
    val endTime = System.currentTimeMillis()
    
    println("Processed ${results.size} images in ${endTime - startTime}ms")
    results.take(10).forEach { println(it) }
}

Performance Optimization

Choosing the Right Dispatcher

import kotlinx.coroutines.*

fun main() = runBlocking {
    // CPU-intensive work
    val cpuIntensiveResult = withContext(Dispatchers.Default) {
        async {
            // Simulate CPU-intensive calculation
            (1..1000000).sum()
        }
    }.await()
    
    // I/O operations
    val ioResult = withContext(Dispatchers.IO) {
        async {
            // Simulate file I/O or network call
            delay(100)
            "I/O operation completed"
        }
    }.await()
    
    // Main/UI thread operations (in Android)
    val mainResult = withContext(Dispatchers.Main.immediate) {
        async {
            // UI updates would go here
            "UI update completed"
        }
    }.await()
    
    println("CPU result: $cpuIntensiveResult")
    println("I/O result: $ioResult")
    println("Main result: $mainResult")
}

Async vs Parallel Execution

import kotlinx.coroutines.*

fun cpuIntensiveTask(id: Int): Long {
    // Simulate CPU-intensive work
    var result = 0L
    repeat(10_000_000) {
        result += it
    }
    return result + id
}

fun main() = runBlocking {
    val taskCount = 4
    
    // Async execution (cooperative)
    val asyncStart = System.currentTimeMillis()
    val asyncResults = (1..taskCount).map { id ->
        async(Dispatchers.Default) {
            cpuIntensiveTask(id)
        }
    }.awaitAll()
    val asyncTime = System.currentTimeMillis() - asyncStart
    
    println("Async results: $asyncResults")
    println("Async time: ${asyncTime}ms")
    
    // For truly parallel CPU work, consider using multiple threads
    val threads = (1..taskCount).map { id ->
        Thread {
            cpuIntensiveTask(id)
        }
    }
    
    val parallelStart = System.currentTimeMillis()
    threads.forEach { it.start() }
    threads.forEach { it.join() }
    val parallelTime = System.currentTimeMillis() - parallelStart
    
    println("Parallel time: ${parallelTime}ms")
}

Best Practices

Guidelines for Async Programming

  • Use structured concurrency: Always use coroutineScope or supervisorScope
  • Handle cancellation: Make your suspending functions cancellation-aware
  • Choose appropriate dispatchers: Use Dispatchers.IO for I/O, Default for CPU work
  • Avoid blocking calls: Use suspending alternatives instead of blocking APIs
  • Limit concurrency: Use Semaphore or other mechanisms to prevent resource exhaustion

Common Anti-patterns

import kotlinx.coroutines.*

// โŒ Don't do this - blocking in coroutines
suspend fun badBlocking() {
    Thread.sleep(1000) // Blocks the thread!
}

// โœ… Do this instead
suspend fun goodSuspending() {
    delay(1000) // Suspends the coroutine
}

// โŒ Don't do this - unstructured concurrency
fun badAsync(): Deferred {
    return GlobalScope.async { // Leaked coroutine!
        delay(1000)
        "Result"
    }
}

// โœ… Do this instead
suspend fun goodAsync(): String = coroutineScope {
    async {
        delay(1000)
        "Result"
    }.await()
}

// โŒ Don't do this - ignoring cancellation
suspend fun badCancellation() {
    repeat(1000) { 
        // Work that ignores cancellation
        Thread.sleep(10)
    }
}

// โœ… Do this instead
suspend fun goodCancellation() {
    repeat(1000) { 
        ensureActive() // Check for cancellation
        delay(10) // or use suspending functions
    }
}

Key Takeaways

  • Async/await enables concurrent execution while maintaining readable code
  • Use structured concurrency to ensure proper cleanup and error handling
  • Handle exceptions appropriately in async operations
  • Choose the right dispatcher for different types of work
  • Limit concurrency to prevent resource exhaustion
  • Make suspending functions cancellation-aware

Practice Exercises

  1. Build a concurrent web scraper that processes multiple URLs simultaneously
  2. Create a batch processing system with configurable concurrency limits
  3. Implement a retry mechanism for async operations with exponential backoff
  4. Design an async caching system with TTL (time-to-live) functionality

Quiz

  1. What's the difference between async and launch in Kotlin coroutines?
  2. When should you use supervisorScope instead of coroutineScope?
  3. How do you handle exceptions in async operations?
Show Answers
  1. async returns a Deferred that can provide a result, while launch returns a Job and is used for fire-and-forget operations.
  2. Use supervisorScope when you want failures in child coroutines to not cancel sibling coroutines.
  3. Use try-catch around await() calls, or handle exceptions within each async block, or use supervisorScope for independent error handling.