Kotlin - Coroutines Advanced
Try it: Build sophisticated asynchronous applications with advanced coroutine patterns and reactive streams.
Advanced Coroutines Concepts
Beyond basic async/await, Kotlin coroutines offer powerful abstractions for complex concurrent programming scenarios including channels, flows, and structured concurrency.
Channels for Communication
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// Producer-Consumer Pattern
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) {
send(x++) // Suspending send
delay(100) // Simulate work
}
}
fun CoroutineScope.filterPrimes(numbers: ReceiveChannel<Int>) = produce<Int> {
for (number in numbers) {
if (isPrime(number)) {
send(number)
}
}
}
fun isPrime(n: Int): Boolean {
if (n < 2) return false
for (i in 2..kotlin.math.sqrt(n.toDouble()).toInt()) {
if (n % i == 0) return false
}
return true
}
suspend fun channelExample() {
val numbers = GlobalScope.produceNumbers()
val primes = GlobalScope.filterPrimes(numbers)
repeat(10) {
println("Prime: ${primes.receive()}")
}
numbers.cancel()
primes.cancel()
}
// Channel Types and Buffering
suspend fun channelTypes() {
// Rendezvous channel (capacity = 0)
val rendezvous = Channel<Int>()
// Buffered channel
val buffered = Channel<Int>(capacity = 10)
// Unlimited channel
val unlimited = Channel<Int>(capacity = Channel.UNLIMITED)
// Conflated channel (only latest value)
val conflated = Channel<Int>(capacity = Channel.CONFLATED)
launch {
repeat(5) { i ->
buffered.send(i)
println("Sent: $i")
}
buffered.close()
}
launch {
for (value in buffered) {
println("Received: $value")
delay(200) // Simulate processing
}
}
}
Flows for Reactive Streams
import kotlinx.coroutines.flow.*
// Cold Flow - executed on collection
fun simpleFlow(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
println("Emitted: $i")
}
}
// Hot Flow - SharedFlow/StateFlow
class CounterViewModel {
private val _counter = MutableStateFlow(0)
val counter: StateFlow<Int> = _counter.asStateFlow()
private val _events = MutableSharedFlow<String>()
val events: SharedFlow<String> = _events.asSharedFlow()
fun increment() {
_counter.value++
_events.tryEmit("Counter incremented to ${_counter.value}")
}
fun decrement() {
_counter.value--
_events.tryEmit("Counter decremented to ${_counter.value}")
}
}
// Flow Transformations
suspend fun flowTransformations() {
val numbers = (1..10).asFlow()
// Map transformation
numbers
.map { it * it }
.filter { it > 20 }
.collect { println("Square > 20: $it") }
// FlatMap for nested flows
val requests = flowOf("user1", "user2", "user3")
requests
.flatMapMerge { userId ->
flow {
delay(100) // Simulate API call
emit("Data for $userId")
}
}
.collect { println(it) }
// Reduce and fold operations
val sum = numbers.reduce { acc, value -> acc + value }
println("Sum: $sum")
val product = numbers.fold(1) { acc, value -> acc * value }
println("Product: $product")
}
// Flow Exception Handling
suspend fun flowErrorHandling() {
flow {
emit(1)
emit(2)
throw RuntimeException("Something went wrong")
emit(3) // Never reached
}
.catch { e ->
println("Caught exception: ${e.message}")
emit(-1) // Emit recovery value
}
.collect { value ->
println("Received: $value")
}
}
// Flow Backpressure and Buffering
suspend fun flowBackpressure() {
flow {
repeat(100) { i ->
emit(i)
println("Emitted: $i")
}
}
.buffer(10) // Buffer up to 10 items
.collect { value ->
delay(100) // Slow consumer
println("Processed: $value")
}
}
Structured Concurrency
import kotlinx.coroutines.*
class DataProcessor {
// Custom scope with SupervisorJob
private val processingScope = CoroutineScope(
SupervisorJob() + Dispatchers.Default + CoroutineName("DataProcessor")
)
suspend fun processDataWithTimeout(data: List<String>): List<ProcessedData> {
return withTimeout(5000) { // 5 second timeout
data.map { item ->
async {
processItem(item)
}
}.awaitAll()
}
}
suspend fun processDataWithStructuredConcurrency(data: List<String>): List<ProcessedData> {
return coroutineScope { // Creates child scope
data.map { item ->
async {
processItem(item)
}
}.awaitAll()
}
}
// Child failure doesn't cancel parent with SupervisorJob
suspend fun processDataWithSupervision(data: List<String>): List<ProcessedData?> {
return supervisorScope {
data.map { item ->
async {
try {
processItem(item)
} catch (e: Exception) {
println("Failed to process $item: ${e.message}")
null
}
}
}.awaitAll()
}
}
private suspend fun processItem(item: String): ProcessedData {
delay(100) // Simulate processing
if (item.contains("error")) {
throw IllegalArgumentException("Invalid item: $item")
}
return ProcessedData(item.uppercase(), System.currentTimeMillis())
}
fun shutdown() {
processingScope.cancel("Service shutdown")
}
}
data class ProcessedData(val value: String, val timestamp: Long)
// Exception Handling in Structured Concurrency
suspend fun exceptionHandlingExample() {
try {
coroutineScope {
val job1 = async {
delay(1000)
"Result 1"
}
val job2 = async {
delay(500)
throw RuntimeException("Job 2 failed")
}
val job3 = async {
delay(1500)
"Result 3"
}
// If job2 fails, all jobs are cancelled
listOf(job1.await(), job2.await(), job3.await())
}
} catch (e: Exception) {
println("Scope failed: ${e.message}")
}
}
Actors for Stateful Concurrency
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// Actor pattern for thread-safe state management
sealed class CounterMessage
object Increment : CounterMessage()
object Decrement : CounterMessage()
data class GetValue(val response: CompletableDeferred<Int>) : CounterMessage()
fun CoroutineScope.counterActor() = actor<CounterMessage> {
var counter = 0
for (message in channel) {
when (message) {
is Increment -> counter++
is Decrement -> counter--
is GetValue -> message.response.complete(counter)
}
}
}
// Bank Account Actor
sealed class AccountMessage
data class Deposit(val amount: Double) : AccountMessage()
data class Withdraw(val amount: Double, val response: CompletableDeferred<Boolean>) : AccountMessage()
data class GetBalance(val response: CompletableDeferred<Double>) : AccountMessage()
class BankAccount(private val scope: CoroutineScope) {
private val actor = scope.actor<AccountMessage> {
var balance = 0.0
for (message in channel) {
when (message) {
is Deposit -> {
balance += message.amount
println("Deposited ${message.amount}, new balance: $balance")
}
is Withdraw -> {
val success = if (balance >= message.amount) {
balance -= message.amount
println("Withdrew ${message.amount}, new balance: $balance")
true
} else {
println("Insufficient funds for withdrawal of ${message.amount}")
false
}
message.response.complete(success)
}
is GetBalance -> {
message.response.complete(balance)
}
}
}
}
suspend fun deposit(amount: Double) {
actor.send(Deposit(amount))
}
suspend fun withdraw(amount: Double): Boolean {
val response = CompletableDeferred<Boolean>()
actor.send(Withdraw(amount, response))
return response.await()
}
suspend fun getBalance(): Double {
val response = CompletableDeferred<Double>()
actor.send(GetBalance(response))
return response.await()
}
fun close() {
actor.close()
}
}
suspend fun actorExample() {
val scope = CoroutineScope(Dispatchers.Default)
val account = BankAccount(scope)
// Concurrent operations
launch {
account.deposit(1000.0)
account.deposit(500.0)
}
launch {
val success1 = account.withdraw(200.0)
println("Withdrawal 1 success: $success1")
val success2 = account.withdraw(2000.0)
println("Withdrawal 2 success: $success2")
}
launch {
delay(100)
val balance = account.getBalance()
println("Final balance: $balance")
}
delay(1000)
account.close()
}
Select Expression for Multiple Channels
suspend fun selectExample() {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
launch {
delay(100)
channel1.send("Message from channel 1")
}
launch {
delay(200)
channel2.send("Message from channel 2")
}
// Select first available message
select<Unit> {
channel1.onReceive { message ->
println("Received from channel1: $message")
}
channel2.onReceive { message ->
println("Received from channel2: $message")
}
}
channel1.close()
channel2.close()
}
// Select with timeout
suspend fun selectWithTimeout() {
val channel = Channel<String>()
launch {
delay(2000) // Longer than timeout
channel.send("Late message")
}
select<String?> {
channel.onReceive { message ->
message
}
onTimeout(1000) {
"Timeout occurred"
}
}.let { result ->
println("Result: $result")
}
}
Performance Optimization
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
class PerformanceOptimization {
// Dispatcher Selection
suspend fun cpuIntensiveWork(): List<Int> {
return withContext(Dispatchers.Default) {
(1..1000000).map { it * it }
}
}
suspend fun ioWork(): String {
return withContext(Dispatchers.IO) {
// Simulate file I/O or network call
delay(1000)
"IO Result"
}
}
// Coroutine Pooling and Reuse
private val processingScope = CoroutineScope(
Dispatchers.Default + SupervisorJob()
)
suspend fun processInBatches(items: List<String>, batchSize: Int = 10): List<String> {
return items.chunked(batchSize).flatMap { batch ->
batch.map { item ->
processingScope.async {
processItem(item)
}
}.awaitAll()
}
}
private suspend fun processItem(item: String): String {
delay(10) // Simulate processing
return item.uppercase()
}
// Flow Performance Optimization
suspend fun optimizedDataProcessing(): List<ProcessedData> {
return flowOf(*Array(1000) { "item$it" })
.buffer(100) // Buffer to handle backpressure
.map { item ->
// CPU intensive transformation
ProcessedData(item.hashCode().toString(), System.currentTimeMillis())
}
.flowOn(Dispatchers.Default) // Switch context for upstream
.filter { it.value.length > 5 }
.take(100)
.toList()
}
// Memory-efficient streaming
suspend fun streamLargeDataset(): Int {
return flow {
repeat(1000000) { i ->
emit(i)
}
}
.filter { it % 2 == 0 }
.map { it * it }
.take(1000)
.sum()
}
}
// Benchmark Coroutine Performance
suspend fun benchmarkCoroutines() {
val optimizer = PerformanceOptimization()
// Measure sequential processing
val sequentialTime = measureTimeMillis {
val items = (1..100).map { "item$it" }
items.map { optimizer.processItem(it) }
}
// Measure concurrent processing
val concurrentTime = measureTimeMillis {
val items = (1..100).map { "item$it" }
items.map { item ->
async { optimizer.processItem(item) }
}.awaitAll()
}
println("Sequential: ${sequentialTime}ms")
println("Concurrent: ${concurrentTime}ms")
println("Speedup: ${sequentialTime.toDouble() / concurrentTime}x")
}
Error Recovery and Circuit Breaker
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
class CircuitBreaker(
private val failureThreshold: Int = 5,
private val recoveryTimeout: Duration = 10.seconds
) {
private var state: State = State.CLOSED
private var failureCount = 0
private var lastFailureTime = 0L
sealed class State {
object CLOSED : State()
object OPEN : State()
object HALF_OPEN : State()
}
suspend fun <T> execute(operation: suspend () -> T): T {
when (state) {
State.OPEN -> {
if (System.currentTimeMillis() - lastFailureTime > recoveryTimeout.inWholeMilliseconds) {
state = State.HALF_OPEN
} else {
throw CircuitBreakerOpenException("Circuit breaker is OPEN")
}
}
else -> { /* Continue execution */ }
}
return try {
val result = operation()
onSuccess()
result
} catch (e: Exception) {
onFailure()
throw e
}
}
private fun onSuccess() {
failureCount = 0
state = State.CLOSED
}
private fun onFailure() {
failureCount++
lastFailureTime = System.currentTimeMillis()
if (failureCount >= failureThreshold) {
state = State.OPEN
}
}
}
class CircuitBreakerOpenException(message: String) : Exception(message)
// Retry with Exponential Backoff
suspend fun <T> retryWithBackoff(
maxRetries: Int = 3,
initialDelay: Duration = 1.seconds,
maxDelay: Duration = 16.seconds,
factor: Double = 2.0,
operation: suspend () -> T
): T {
var currentDelay = initialDelay
var lastException: Exception? = null
repeat(maxRetries + 1) { attempt ->
try {
return operation()
} catch (e: Exception) {
lastException = e
if (attempt == maxRetries) {
throw e
}
println("Attempt ${attempt + 1} failed, retrying in $currentDelay")
delay(currentDelay)
currentDelay = (currentDelay * factor).coerceAtMost(maxDelay)
}
}
throw lastException!!
}
// Resilient Service
class ResilientApiService {
private val circuitBreaker = CircuitBreaker()
suspend fun fetchData(id: String): ApiResponse = circuitBreaker.execute {
retryWithBackoff {
simulateApiCall(id)
}
}
private suspend fun simulateApiCall(id: String): ApiResponse {
delay(100) // Simulate network delay
// Simulate random failures
if (Math.random() < 0.3) {
throw RuntimeException("API call failed for $id")
}
return ApiResponse(id, "Data for $id")
}
}
data class ApiResponse(val id: String, val data: String)
suspend fun resilienceExample() {
val service = ResilientApiService()
repeat(20) { i ->
try {
val response = service.fetchData("item$i")
println("Success: ${response.data}")
} catch (e: Exception) {
println("Failed: ${e.message}")
}
delay(500)
}
}
Advanced Patterns Summary
- Channels: Use for communication between coroutines, choose appropriate capacity
- Flows: Prefer for reactive streams, leverage operators for transformations
- Actors: Encapsulate mutable state, provide thread-safe operations
- Structured Concurrency: Ensure proper cleanup and exception propagation
- Performance: Select appropriate dispatchers, use buffering and batching
Common Anti-patterns
- Using GlobalScope instead of structured concurrency
- Blocking the main thread with runBlocking in production
- Not handling cancellation properly in long-running operations
- Using Dispatchers.Main for CPU-intensive work
- Creating too many concurrent coroutines without limits
Practice Exercises
- Implement a rate-limited API client using channels and flow operators
- Build a reactive data pipeline that processes real-time events
- Create a distributed task queue using actors and structured concurrency
- Design a resilient microservice with circuit breakers and retry policies
Architecture Notes
- Scalability: Coroutines scale to millions of concurrent operations
- Resource Management: Proper scoping prevents resource leaks
- Error Handling: Structured concurrency provides predictable error behavior
- Testability: Coroutines are easily testable with TestCoroutineScheduler