Kotlin - Async Programming
Overview
Async programming in Kotlin provides powerful tools for handling concurrent operations efficiently. This tutorial covers async/await patterns, concurrent execution, structured concurrency, and best practices for building responsive applications.
๐ฏ Learning Objectives:
- Understand async/await patterns in Kotlin
- Learn to handle concurrent operations effectively
- Master structured concurrency principles
- Apply async programming in real-world scenarios
- Handle errors and cancellation in async code
Basic Async/Await
The async/await pattern allows you to write asynchronous code that looks and feels like synchronous code.
Simple Async Operations
import kotlinx.coroutines.*
// Simulate async operations
suspend fun fetchUserData(userId: Int): String {
delay(1000) // Simulate network delay
return "User data for ID: $userId"
}
suspend fun fetchUserPosts(userId: Int): List {
delay(800) // Simulate network delay
return listOf("Post 1", "Post 2", "Post 3")
}
fun main() = runBlocking {
println("Starting async operations...")
// Sequential execution
val startTime = System.currentTimeMillis()
val userData = fetchUserData(123)
val userPosts = fetchUserPosts(123)
val endTime = System.currentTimeMillis()
println("User Data: $userData")
println("User Posts: $userPosts")
println("Total time: ${endTime - startTime}ms")
}
Concurrent Execution with Async
import kotlinx.coroutines.*
fun main() = runBlocking {
println("Starting concurrent operations...")
val startTime = System.currentTimeMillis()
// Launch both operations concurrently
val userDataDeferred = async { fetchUserData(123) }
val userPostsDeferred = async { fetchUserPosts(123) }
// Wait for both to complete
val userData = userDataDeferred.await()
val userPosts = userPostsDeferred.await()
val endTime = System.currentTimeMillis()
println("User Data: $userData")
println("User Posts: $userPosts")
println("Total time: ${endTime - startTime}ms") // Much faster!
}
Key Insight: Using async allows operations to run concurrently, significantly reducing total execution time when operations are independent.
Structured Concurrency
Structured concurrency ensures that async operations are properly managed and cleaned up.
Coroutine Scope Management
import kotlinx.coroutines.*
class UserService {
// Create a scope for managing coroutines
private val serviceScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
suspend fun getUserProfile(userId: Int): UserProfile = coroutineScope {
// All async operations are children of this scope
val basicInfoDeferred = async { fetchBasicInfo(userId) }
val preferencesDeferred = async { fetchPreferences(userId) }
val statisticsDeferred = async { fetchStatistics(userId) }
// If any operation fails, all are cancelled
UserProfile(
basicInfo = basicInfoDeferred.await(),
preferences = preferencesDeferred.await(),
statistics = statisticsDeferred.await()
)
}
private suspend fun fetchBasicInfo(userId: Int): BasicInfo {
delay(500)
return BasicInfo("User $userId", "[email protected]")
}
private suspend fun fetchPreferences(userId: Int): Preferences {
delay(300)
return Preferences("dark", "en")
}
private suspend fun fetchStatistics(userId: Int): Statistics {
delay(400)
return Statistics(loginCount = 42, postsCount = 15)
}
fun cleanup() {
serviceScope.cancel()
}
}
data class BasicInfo(val name: String, val email: String)
data class Preferences(val theme: String, val language: String)
data class Statistics(val loginCount: Int, val postsCount: Int)
data class UserProfile(
val basicInfo: BasicInfo,
val preferences: Preferences,
val statistics: Statistics
)
fun main() = runBlocking {
val userService = UserService()
try {
val profile = userService.getUserProfile(123)
println("Profile loaded: $profile")
} finally {
userService.cleanup()
}
}
Multiple Async Operations
Processing Collections Concurrently
import kotlinx.coroutines.*
suspend fun processUser(userId: Int): String {
delay(100) // Simulate processing time
return "Processed user $userId"
}
fun main() = runBlocking {
val userIds = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Sequential processing
val sequentialStart = System.currentTimeMillis()
val sequentialResults = userIds.map { processUser(it) }
val sequentialTime = System.currentTimeMillis() - sequentialStart
println("Sequential results: $sequentialResults")
println("Sequential time: ${sequentialTime}ms")
// Concurrent processing
val concurrentStart = System.currentTimeMillis()
val concurrentResults = userIds.map { userId ->
async { processUser(userId) }
}.map { it.await() }
val concurrentTime = System.currentTimeMillis() - concurrentStart
println("Concurrent results: $concurrentResults")
println("Concurrent time: ${concurrentTime}ms")
// Concurrent processing with awaitAll
val awaitAllStart = System.currentTimeMillis()
val awaitAllResults = userIds.map { userId ->
async { processUser(userId) }
}.awaitAll()
val awaitAllTime = System.currentTimeMillis() - awaitAllStart
println("AwaitAll results: $awaitAllResults")
println("AwaitAll time: ${awaitAllTime}ms")
}
Limiting Concurrency
import kotlinx.coroutines.*
import java.util.concurrent.Semaphore
class ConcurrencyLimitedService {
private val semaphore = Semaphore(3) // Allow max 3 concurrent operations
suspend fun processWithLimit(id: Int): String {
semaphore.acquire()
try {
delay(500) // Simulate work
return "Processed $id"
} finally {
semaphore.release()
}
}
}
fun main() = runBlocking {
val service = ConcurrencyLimitedService()
val items = (1..10).toList()
val results = items.map { id ->
async { service.processWithLimit(id) }
}.awaitAll()
println("Results: $results")
}
Error Handling in Async Code
Exception Propagation
import kotlinx.coroutines.*
suspend fun riskyOperation(id: Int): String {
delay(100)
if (id == 3) {
throw RuntimeException("Operation failed for ID: $id")
}
return "Success for ID: $id"
}
fun main() = runBlocking {
// Exception in one async operation cancels siblings
try {
val results = (1..5).map { id ->
async { riskyOperation(id) }
}.awaitAll()
println("All results: $results")
} catch (e: Exception) {
println("Caught exception: ${e.message}")
}
println("\n--- Using supervisorScope ---")
// Using supervisorScope to handle failures independently
supervisorScope {
val results = (1..5).map { id ->
async {
try {
riskyOperation(id)
} catch (e: Exception) {
"Failed: ${e.message}"
}
}
}.awaitAll()
println("Results with error handling: $results")
}
}
Try-Catch with Async
import kotlinx.coroutines.*
fun main() = runBlocking {
// Individual error handling
val results = (1..5).map { id ->
async {
try {
riskyOperation(id)
} catch (e: Exception) {
"Error for $id: ${e.message}"
}
}
}.awaitAll()
println("Results with individual handling: $results")
// Selective error handling
val safeResults = supervisorScope {
(1..5).map { id ->
async {
runCatching { riskyOperation(id) }
.getOrElse { "Failed: ${it.message}" }
}
}.awaitAll()
}
println("Safe results: $safeResults")
}
Async Patterns and Use Cases
Producer-Consumer Pattern
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
class AsyncProducerConsumer {
private val channel = Channel(capacity = 10)
fun startProducer(scope: CoroutineScope) = scope.launch {
repeat(20) { value ->
println("Producing: $value")
channel.send(value)
delay(50) // Simulate production time
}
channel.close()
}
fun startConsumer(scope: CoroutineScope, consumerId: Int) = scope.launch {
for (value in channel) {
println("Consumer $consumerId processing: $value")
delay(100) // Simulate processing time
}
println("Consumer $consumerId finished")
}
}
fun main() = runBlocking {
val producerConsumer = AsyncProducerConsumer()
// Start producer and multiple consumers
val producer = producerConsumer.startProducer(this)
val consumer1 = producerConsumer.startConsumer(this, 1)
val consumer2 = producerConsumer.startConsumer(this, 2)
// Wait for all to complete
joinAll(producer, consumer1, consumer2)
}
Parallel Processing with Results Aggregation
import kotlinx.coroutines.*
data class ProcessingResult(val id: Int, val result: String, val processingTime: Long)
suspend fun heavyComputation(id: Int): ProcessingResult {
val start = System.currentTimeMillis()
delay((100..500).random().toLong()) // Simulate variable processing time
val end = System.currentTimeMillis()
return ProcessingResult(id, "Result for $id", end - start)
}
fun main() = runBlocking {
val itemIds = (1..10).toList()
// Process items in parallel and collect results
val results = itemIds.map { id ->
async { heavyComputation(id) }
}.awaitAll()
// Aggregate results
val totalTime = results.sumOf { it.processingTime }
val averageTime = results.map { it.processingTime }.average()
val fastestResult = results.minByOrNull { it.processingTime }
val slowestResult = results.maxByOrNull { it.processingTime }
println("All results:")
results.forEach { println(" $it") }
println("\nAggregated statistics:")
println("Total processing time: ${totalTime}ms")
println("Average processing time: ${averageTime.toInt()}ms")
println("Fastest: ${fastestResult?.result} (${fastestResult?.processingTime}ms)")
println("Slowest: ${slowestResult?.result} (${slowestResult?.processingTime}ms)")
}
Real-World Examples
Web API Client
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class ApiClient {
suspend fun fetchUser(userId: Int): User {
delay(200) // Simulate HTTP request
return User(userId, "User $userId", "[email protected]")
}
suspend fun fetchPosts(userId: Int): List {
delay(300)
return listOf(
Post(1, "Post 1", "Content 1"),
Post(2, "Post 2", "Content 2")
)
}
suspend fun fetchComments(postId: Int): List {
delay(150)
return listOf(
Comment(1, "Great post!"),
Comment(2, "Thanks for sharing")
)
}
}
data class User(val id: Int, val name: String, val email: String)
data class Post(val id: Int, val title: String, val content: String)
data class Comment(val id: Int, val text: String)
data class UserWithPosts(val user: User, val posts: List)
data class PostWithComments(val post: Post, val comments: List)
class UserProfileService(private val apiClient: ApiClient) {
suspend fun getCompleteUserProfile(userId: Int): UserWithPosts = coroutineScope {
// Fetch user and posts concurrently
val userDeferred = async { apiClient.fetchUser(userId) }
val postsDeferred = async { apiClient.fetchPosts(userId) }
val user = userDeferred.await()
val posts = postsDeferred.await()
// Fetch comments for all posts concurrently
val postsWithComments = posts.map { post ->
async {
val comments = apiClient.fetchComments(post.id)
PostWithComments(post, comments)
}
}.awaitAll()
UserWithPosts(user, postsWithComments)
}
}
fun main() = runBlocking {
val apiClient = ApiClient()
val profileService = UserProfileService(apiClient)
val startTime = System.currentTimeMillis()
val profile = profileService.getCompleteUserProfile(123)
val endTime = System.currentTimeMillis()
println("User: ${profile.user}")
println("Posts with comments:")
profile.posts.forEach { postWithComments ->
println(" ${postWithComments.post.title}")
postWithComments.comments.forEach { comment ->
println(" - ${comment.text}")
}
}
println("Total time: ${endTime - startTime}ms")
}
Batch Processing System
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class BatchProcessor(
private val batchSize: Int = 10,
private val maxConcurrency: Int = 3,
private val processor: suspend (List) -> List
) {
suspend fun processAll(items: List): List = coroutineScope {
val semaphore = kotlinx.coroutines.sync.Semaphore(maxConcurrency)
items.chunked(batchSize)
.map { batch ->
async {
semaphore.withPermit {
processor(batch)
}
}
}
.awaitAll()
.flatten()
}
}
// Example usage
suspend fun processImageBatch(images: List): List {
delay(500) // Simulate image processing
return images.map { "Processed: $it" }
}
fun main() = runBlocking {
val images = (1..50).map { "image_$it.jpg" }
val processor = BatchProcessor(
batchSize = 5,
maxConcurrency = 3,
processor = ::processImageBatch
)
val startTime = System.currentTimeMillis()
val results = processor.processAll(images)
val endTime = System.currentTimeMillis()
println("Processed ${results.size} images in ${endTime - startTime}ms")
results.take(10).forEach { println(it) }
}
Performance Optimization
Choosing the Right Dispatcher
import kotlinx.coroutines.*
fun main() = runBlocking {
// CPU-intensive work
val cpuIntensiveResult = withContext(Dispatchers.Default) {
async {
// Simulate CPU-intensive calculation
(1..1000000).sum()
}
}.await()
// I/O operations
val ioResult = withContext(Dispatchers.IO) {
async {
// Simulate file I/O or network call
delay(100)
"I/O operation completed"
}
}.await()
// Main/UI thread operations (in Android)
val mainResult = withContext(Dispatchers.Main.immediate) {
async {
// UI updates would go here
"UI update completed"
}
}.await()
println("CPU result: $cpuIntensiveResult")
println("I/O result: $ioResult")
println("Main result: $mainResult")
}
Async vs Parallel Execution
import kotlinx.coroutines.*
fun cpuIntensiveTask(id: Int): Long {
// Simulate CPU-intensive work
var result = 0L
repeat(10_000_000) {
result += it
}
return result + id
}
fun main() = runBlocking {
val taskCount = 4
// Async execution (cooperative)
val asyncStart = System.currentTimeMillis()
val asyncResults = (1..taskCount).map { id ->
async(Dispatchers.Default) {
cpuIntensiveTask(id)
}
}.awaitAll()
val asyncTime = System.currentTimeMillis() - asyncStart
println("Async results: $asyncResults")
println("Async time: ${asyncTime}ms")
// For truly parallel CPU work, consider using multiple threads
val threads = (1..taskCount).map { id ->
Thread {
cpuIntensiveTask(id)
}
}
val parallelStart = System.currentTimeMillis()
threads.forEach { it.start() }
threads.forEach { it.join() }
val parallelTime = System.currentTimeMillis() - parallelStart
println("Parallel time: ${parallelTime}ms")
}
Best Practices
Guidelines for Async Programming
- Use structured concurrency: Always use coroutineScope or supervisorScope
- Handle cancellation: Make your suspending functions cancellation-aware
- Choose appropriate dispatchers: Use Dispatchers.IO for I/O, Default for CPU work
- Avoid blocking calls: Use suspending alternatives instead of blocking APIs
- Limit concurrency: Use Semaphore or other mechanisms to prevent resource exhaustion
Common Anti-patterns
import kotlinx.coroutines.*
// โ Don't do this - blocking in coroutines
suspend fun badBlocking() {
Thread.sleep(1000) // Blocks the thread!
}
// โ
Do this instead
suspend fun goodSuspending() {
delay(1000) // Suspends the coroutine
}
// โ Don't do this - unstructured concurrency
fun badAsync(): Deferred {
return GlobalScope.async { // Leaked coroutine!
delay(1000)
"Result"
}
}
// โ
Do this instead
suspend fun goodAsync(): String = coroutineScope {
async {
delay(1000)
"Result"
}.await()
}
// โ Don't do this - ignoring cancellation
suspend fun badCancellation() {
repeat(1000) {
// Work that ignores cancellation
Thread.sleep(10)
}
}
// โ
Do this instead
suspend fun goodCancellation() {
repeat(1000) {
ensureActive() // Check for cancellation
delay(10) // or use suspending functions
}
}
Key Takeaways
- Async/await enables concurrent execution while maintaining readable code
- Use structured concurrency to ensure proper cleanup and error handling
- Handle exceptions appropriately in async operations
- Choose the right dispatcher for different types of work
- Limit concurrency to prevent resource exhaustion
- Make suspending functions cancellation-aware
Practice Exercises
- Build a concurrent web scraper that processes multiple URLs simultaneously
- Create a batch processing system with configurable concurrency limits
- Implement a retry mechanism for async operations with exponential backoff
- Design an async caching system with TTL (time-to-live) functionality
Quiz
- What's the difference between async and launch in Kotlin coroutines?
- When should you use supervisorScope instead of coroutineScope?
- How do you handle exceptions in async operations?
Show Answers
- async returns a Deferred that can provide a result, while launch returns a Job and is used for fire-and-forget operations.
- Use supervisorScope when you want failures in child coroutines to not cancel sibling coroutines.
- Use try-catch around await() calls, or handle exceptions within each async block, or use supervisorScope for independent error handling.