Kotlin - Coroutines Advanced

Try it: Build sophisticated asynchronous applications with advanced coroutine patterns and reactive streams.

Advanced Coroutines Concepts

Beyond basic async/await, Kotlin coroutines offer powerful abstractions for complex concurrent programming scenarios including channels, flows, and structured concurrency.

Channels for Communication

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// Producer-Consumer Pattern
fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++) // Suspending send
        delay(100) // Simulate work
    }
}

fun CoroutineScope.filterPrimes(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (number in numbers) {
        if (isPrime(number)) {
            send(number)
        }
    }
}

fun isPrime(n: Int): Boolean {
    if (n < 2) return false
    for (i in 2..kotlin.math.sqrt(n.toDouble()).toInt()) {
        if (n % i == 0) return false
    }
    return true
}

suspend fun channelExample() {
    val numbers = GlobalScope.produceNumbers()
    val primes = GlobalScope.filterPrimes(numbers)
    
    repeat(10) {
        println("Prime: ${primes.receive()}")
    }
    
    numbers.cancel()
    primes.cancel()
}

// Channel Types and Buffering
suspend fun channelTypes() {
    // Rendezvous channel (capacity = 0)
    val rendezvous = Channel<Int>()
    
    // Buffered channel
    val buffered = Channel<Int>(capacity = 10)
    
    // Unlimited channel
    val unlimited = Channel<Int>(capacity = Channel.UNLIMITED)
    
    // Conflated channel (only latest value)
    val conflated = Channel<Int>(capacity = Channel.CONFLATED)
    
    launch {
        repeat(5) { i ->
            buffered.send(i)
            println("Sent: $i")
        }
        buffered.close()
    }
    
    launch {
        for (value in buffered) {
            println("Received: $value")
            delay(200) // Simulate processing
        }
    }
}

Flows for Reactive Streams

import kotlinx.coroutines.flow.*

// Cold Flow - executed on collection
fun simpleFlow(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
        println("Emitted: $i")
    }
}

// Hot Flow - SharedFlow/StateFlow
class CounterViewModel {
    private val _counter = MutableStateFlow(0)
    val counter: StateFlow<Int> = _counter.asStateFlow()
    
    private val _events = MutableSharedFlow<String>()
    val events: SharedFlow<String> = _events.asSharedFlow()
    
    fun increment() {
        _counter.value++
        _events.tryEmit("Counter incremented to ${_counter.value}")
    }
    
    fun decrement() {
        _counter.value--
        _events.tryEmit("Counter decremented to ${_counter.value}")
    }
}

// Flow Transformations
suspend fun flowTransformations() {
    val numbers = (1..10).asFlow()
    
    // Map transformation
    numbers
        .map { it * it }
        .filter { it > 20 }
        .collect { println("Square > 20: $it") }
    
    // FlatMap for nested flows
    val requests = flowOf("user1", "user2", "user3")
    requests
        .flatMapMerge { userId ->
            flow {
                delay(100) // Simulate API call
                emit("Data for $userId")
            }
        }
        .collect { println(it) }
    
    // Reduce and fold operations
    val sum = numbers.reduce { acc, value -> acc + value }
    println("Sum: $sum")
    
    val product = numbers.fold(1) { acc, value -> acc * value }
    println("Product: $product")
}

// Flow Exception Handling
suspend fun flowErrorHandling() {
    flow {
        emit(1)
        emit(2)
        throw RuntimeException("Something went wrong")
        emit(3) // Never reached
    }
    .catch { e ->
        println("Caught exception: ${e.message}")
        emit(-1) // Emit recovery value
    }
    .collect { value ->
        println("Received: $value")
    }
}

// Flow Backpressure and Buffering
suspend fun flowBackpressure() {
    flow {
        repeat(100) { i ->
            emit(i)
            println("Emitted: $i")
        }
    }
    .buffer(10) // Buffer up to 10 items
    .collect { value ->
        delay(100) // Slow consumer
        println("Processed: $value")
    }
}

Structured Concurrency

import kotlinx.coroutines.*

class DataProcessor {
    
    // Custom scope with SupervisorJob
    private val processingScope = CoroutineScope(
        SupervisorJob() + Dispatchers.Default + CoroutineName("DataProcessor")
    )
    
    suspend fun processDataWithTimeout(data: List<String>): List<ProcessedData> {
        return withTimeout(5000) { // 5 second timeout
            data.map { item ->
                async {
                    processItem(item)
                }
            }.awaitAll()
        }
    }
    
    suspend fun processDataWithStructuredConcurrency(data: List<String>): List<ProcessedData> {
        return coroutineScope { // Creates child scope
            data.map { item ->
                async {
                    processItem(item)
                }
            }.awaitAll()
        }
    }
    
    // Child failure doesn't cancel parent with SupervisorJob
    suspend fun processDataWithSupervision(data: List<String>): List<ProcessedData?> {
        return supervisorScope {
            data.map { item ->
                async {
                    try {
                        processItem(item)
                    } catch (e: Exception) {
                        println("Failed to process $item: ${e.message}")
                        null
                    }
                }
            }.awaitAll()
        }
    }
    
    private suspend fun processItem(item: String): ProcessedData {
        delay(100) // Simulate processing
        if (item.contains("error")) {
            throw IllegalArgumentException("Invalid item: $item")
        }
        return ProcessedData(item.uppercase(), System.currentTimeMillis())
    }
    
    fun shutdown() {
        processingScope.cancel("Service shutdown")
    }
}

data class ProcessedData(val value: String, val timestamp: Long)

// Exception Handling in Structured Concurrency
suspend fun exceptionHandlingExample() {
    try {
        coroutineScope {
            val job1 = async { 
                delay(1000)
                "Result 1"
            }
            
            val job2 = async {
                delay(500)
                throw RuntimeException("Job 2 failed")
            }
            
            val job3 = async {
                delay(1500)
                "Result 3"
            }
            
            // If job2 fails, all jobs are cancelled
            listOf(job1.await(), job2.await(), job3.await())
        }
    } catch (e: Exception) {
        println("Scope failed: ${e.message}")
    }
}

Actors for Stateful Concurrency

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// Actor pattern for thread-safe state management
sealed class CounterMessage
object Increment : CounterMessage()
object Decrement : CounterMessage()
data class GetValue(val response: CompletableDeferred<Int>) : CounterMessage()

fun CoroutineScope.counterActor() = actor<CounterMessage> {
    var counter = 0
    
    for (message in channel) {
        when (message) {
            is Increment -> counter++
            is Decrement -> counter--
            is GetValue -> message.response.complete(counter)
        }
    }
}

// Bank Account Actor
sealed class AccountMessage
data class Deposit(val amount: Double) : AccountMessage()
data class Withdraw(val amount: Double, val response: CompletableDeferred<Boolean>) : AccountMessage()
data class GetBalance(val response: CompletableDeferred<Double>) : AccountMessage()

class BankAccount(private val scope: CoroutineScope) {
    private val actor = scope.actor<AccountMessage> {
        var balance = 0.0
        
        for (message in channel) {
            when (message) {
                is Deposit -> {
                    balance += message.amount
                    println("Deposited ${message.amount}, new balance: $balance")
                }
                is Withdraw -> {
                    val success = if (balance >= message.amount) {
                        balance -= message.amount
                        println("Withdrew ${message.amount}, new balance: $balance")
                        true
                    } else {
                        println("Insufficient funds for withdrawal of ${message.amount}")
                        false
                    }
                    message.response.complete(success)
                }
                is GetBalance -> {
                    message.response.complete(balance)
                }
            }
        }
    }
    
    suspend fun deposit(amount: Double) {
        actor.send(Deposit(amount))
    }
    
    suspend fun withdraw(amount: Double): Boolean {
        val response = CompletableDeferred<Boolean>()
        actor.send(Withdraw(amount, response))
        return response.await()
    }
    
    suspend fun getBalance(): Double {
        val response = CompletableDeferred<Double>()
        actor.send(GetBalance(response))
        return response.await()
    }
    
    fun close() {
        actor.close()
    }
}

suspend fun actorExample() {
    val scope = CoroutineScope(Dispatchers.Default)
    val account = BankAccount(scope)
    
    // Concurrent operations
    launch {
        account.deposit(1000.0)
        account.deposit(500.0)
    }
    
    launch {
        val success1 = account.withdraw(200.0)
        println("Withdrawal 1 success: $success1")
        
        val success2 = account.withdraw(2000.0)
        println("Withdrawal 2 success: $success2")
    }
    
    launch {
        delay(100)
        val balance = account.getBalance()
        println("Final balance: $balance")
    }
    
    delay(1000)
    account.close()
}

Select Expression for Multiple Channels

suspend fun selectExample() {
    val channel1 = Channel<String>()
    val channel2 = Channel<String>()
    
    launch {
        delay(100)
        channel1.send("Message from channel 1")
    }
    
    launch {
        delay(200)
        channel2.send("Message from channel 2")
    }
    
    // Select first available message
    select<Unit> {
        channel1.onReceive { message ->
            println("Received from channel1: $message")
        }
        channel2.onReceive { message ->
            println("Received from channel2: $message")
        }
    }
    
    channel1.close()
    channel2.close()
}

// Select with timeout
suspend fun selectWithTimeout() {
    val channel = Channel<String>()
    
    launch {
        delay(2000) // Longer than timeout
        channel.send("Late message")
    }
    
    select<String?> {
        channel.onReceive { message ->
            message
        }
        onTimeout(1000) {
            "Timeout occurred"
        }
    }.let { result ->
        println("Result: $result")
    }
}

Performance Optimization

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

class PerformanceOptimization {
    
    // Dispatcher Selection
    suspend fun cpuIntensiveWork(): List<Int> {
        return withContext(Dispatchers.Default) {
            (1..1000000).map { it * it }
        }
    }
    
    suspend fun ioWork(): String {
        return withContext(Dispatchers.IO) {
            // Simulate file I/O or network call
            delay(1000)
            "IO Result"
        }
    }
    
    // Coroutine Pooling and Reuse
    private val processingScope = CoroutineScope(
        Dispatchers.Default + SupervisorJob()
    )
    
    suspend fun processInBatches(items: List<String>, batchSize: Int = 10): List<String> {
        return items.chunked(batchSize).flatMap { batch ->
            batch.map { item ->
                processingScope.async {
                    processItem(item)
                }
            }.awaitAll()
        }
    }
    
    private suspend fun processItem(item: String): String {
        delay(10) // Simulate processing
        return item.uppercase()
    }
    
    // Flow Performance Optimization
    suspend fun optimizedDataProcessing(): List<ProcessedData> {
        return flowOf(*Array(1000) { "item$it" })
            .buffer(100) // Buffer to handle backpressure
            .map { item ->
                // CPU intensive transformation
                ProcessedData(item.hashCode().toString(), System.currentTimeMillis())
            }
            .flowOn(Dispatchers.Default) // Switch context for upstream
            .filter { it.value.length > 5 }
            .take(100)
            .toList()
    }
    
    // Memory-efficient streaming
    suspend fun streamLargeDataset(): Int {
        return flow {
            repeat(1000000) { i ->
                emit(i)
            }
        }
        .filter { it % 2 == 0 }
        .map { it * it }
        .take(1000)
        .sum()
    }
}

// Benchmark Coroutine Performance
suspend fun benchmarkCoroutines() {
    val optimizer = PerformanceOptimization()
    
    // Measure sequential processing
    val sequentialTime = measureTimeMillis {
        val items = (1..100).map { "item$it" }
        items.map { optimizer.processItem(it) }
    }
    
    // Measure concurrent processing
    val concurrentTime = measureTimeMillis {
        val items = (1..100).map { "item$it" }
        items.map { item ->
            async { optimizer.processItem(item) }
        }.awaitAll()
    }
    
    println("Sequential: ${sequentialTime}ms")
    println("Concurrent: ${concurrentTime}ms")
    println("Speedup: ${sequentialTime.toDouble() / concurrentTime}x")
}

Error Recovery and Circuit Breaker

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

class CircuitBreaker(
    private val failureThreshold: Int = 5,
    private val recoveryTimeout: Duration = 10.seconds
) {
    private var state: State = State.CLOSED
    private var failureCount = 0
    private var lastFailureTime = 0L
    
    sealed class State {
        object CLOSED : State()
        object OPEN : State()
        object HALF_OPEN : State()
    }
    
    suspend fun <T> execute(operation: suspend () -> T): T {
        when (state) {
            State.OPEN -> {
                if (System.currentTimeMillis() - lastFailureTime > recoveryTimeout.inWholeMilliseconds) {
                    state = State.HALF_OPEN
                } else {
                    throw CircuitBreakerOpenException("Circuit breaker is OPEN")
                }
            }
            else -> { /* Continue execution */ }
        }
        
        return try {
            val result = operation()
            onSuccess()
            result
        } catch (e: Exception) {
            onFailure()
            throw e
        }
    }
    
    private fun onSuccess() {
        failureCount = 0
        state = State.CLOSED
    }
    
    private fun onFailure() {
        failureCount++
        lastFailureTime = System.currentTimeMillis()
        
        if (failureCount >= failureThreshold) {
            state = State.OPEN
        }
    }
}

class CircuitBreakerOpenException(message: String) : Exception(message)

// Retry with Exponential Backoff
suspend fun <T> retryWithBackoff(
    maxRetries: Int = 3,
    initialDelay: Duration = 1.seconds,
    maxDelay: Duration = 16.seconds,
    factor: Double = 2.0,
    operation: suspend () -> T
): T {
    var currentDelay = initialDelay
    var lastException: Exception? = null
    
    repeat(maxRetries + 1) { attempt ->
        try {
            return operation()
        } catch (e: Exception) {
            lastException = e
            
            if (attempt == maxRetries) {
                throw e
            }
            
            println("Attempt ${attempt + 1} failed, retrying in $currentDelay")
            delay(currentDelay)
            
            currentDelay = (currentDelay * factor).coerceAtMost(maxDelay)
        }
    }
    
    throw lastException!!
}

// Resilient Service
class ResilientApiService {
    private val circuitBreaker = CircuitBreaker()
    
    suspend fun fetchData(id: String): ApiResponse = circuitBreaker.execute {
        retryWithBackoff {
            simulateApiCall(id)
        }
    }
    
    private suspend fun simulateApiCall(id: String): ApiResponse {
        delay(100) // Simulate network delay
        
        // Simulate random failures
        if (Math.random() < 0.3) {
            throw RuntimeException("API call failed for $id")
        }
        
        return ApiResponse(id, "Data for $id")
    }
}

data class ApiResponse(val id: String, val data: String)

suspend fun resilienceExample() {
    val service = ResilientApiService()
    
    repeat(20) { i ->
        try {
            val response = service.fetchData("item$i")
            println("Success: ${response.data}")
        } catch (e: Exception) {
            println("Failed: ${e.message}")
        }
        delay(500)
    }
}

Advanced Patterns Summary

  • Channels: Use for communication between coroutines, choose appropriate capacity
  • Flows: Prefer for reactive streams, leverage operators for transformations
  • Actors: Encapsulate mutable state, provide thread-safe operations
  • Structured Concurrency: Ensure proper cleanup and exception propagation
  • Performance: Select appropriate dispatchers, use buffering and batching

Common Anti-patterns

  • Using GlobalScope instead of structured concurrency
  • Blocking the main thread with runBlocking in production
  • Not handling cancellation properly in long-running operations
  • Using Dispatchers.Main for CPU-intensive work
  • Creating too many concurrent coroutines without limits

Practice Exercises

  1. Implement a rate-limited API client using channels and flow operators
  2. Build a reactive data pipeline that processes real-time events
  3. Create a distributed task queue using actors and structured concurrency
  4. Design a resilient microservice with circuit breakers and retry policies

Architecture Notes

  • Scalability: Coroutines scale to millions of concurrent operations
  • Resource Management: Proper scoping prevents resource leaks
  • Error Handling: Structured concurrency provides predictable error behavior
  • Testability: Coroutines are easily testable with TestCoroutineScheduler