Kotlin Coroutinesの基本——async/awaitとFlowを理解する

Kotlinのコルーチンを使った非同期処理の基本を、suspend関数・async/await・Flowの使い方を中心に解説します。

コルーチンとは

Kotlinのコルーチンはスレッドをブロックせずに非同期処理を書くための仕組みです。suspend キーワードをつけた関数をコルーチン内で呼び出すだけで、通常の逐次的なコードと同じように非同期処理を記述できます。


suspend 関数の基本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import kotlinx.coroutines.*

suspend fun fetchUser(id: Int): String {
    delay(1000)  // スレッドをブロックしないsleep
    return "User #$id"
}

fun main() = runBlocking {
    val user = fetchUser(1)
    println(user)  // 1秒後に "User #1"
}

delay() はスレッドをブロックしないため、その間に他のコルーチンが動けます。Thread.sleep() との大きな違いです。


並列実行(async / await)

複数の非同期処理を並列実行したい場合は async を使います。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
suspend fun fetchProfile(id: Int): String {
    delay(500)
    return "Profile #$id"
}

suspend fun fetchPosts(userId: Int): List<String> {
    delay(700)
    return listOf("Post 1", "Post 2")
}

fun main() = runBlocking {
    val start = System.currentTimeMillis()

    // 順次実行(合計1200ms かかる)
    val profile1 = fetchProfile(1)
    val posts1   = fetchPosts(1)

    // 並列実行(700ms で終わる)
    val profileDeferred = async { fetchProfile(1) }
    val postsDeferred   = async { fetchPosts(1) }
    val profile2 = profileDeferred.await()
    val posts2   = postsDeferred.await()

    println("並列: ${System.currentTimeMillis() - start}ms")
}

async { } はすぐに実行を開始し、Deferred を返します。.await() で結果を取り出します。


構造化された並行性

コルーチンは スコープ(CoroutineScope) 内で起動するのが基本です。スコープが終了すると、子コルーチンは自動的にキャンセルされます。

1
2
3
4
5
suspend fun loadUserData(id: Int): UserData = coroutineScope {
    val profile = async { fetchProfile(id) }
    val posts   = async { fetchPosts(id) }
    UserData(profile.await(), posts.await())
}

coroutineScope は内部のすべての子コルーチンが完了するまで待機します。いずれかが失敗すると、残りの子もキャンセルされます。


エラーハンドリング

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
fun main() = runBlocking {
    // try/catch で通常どおり扱える
    try {
        val result = withTimeout(500) {
            fetchUser(1)  // 1秒かかるので TimeoutCancellationException が発生
        }
    } catch (e: TimeoutCancellationException) {
        println("タイムアウト")
    }
}

async で起動したコルーチンの例外は .await() を呼んだときに伝播します。

1
2
3
4
5
6
val deferred = async { throw RuntimeException("failed") }
try {
    deferred.await()
} catch (e: RuntimeException) {
    println("エラー: ${e.message}")
}

Flow——非同期ストリーム

複数の値を非同期に流すには Flow を使います。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import kotlinx.coroutines.flow.*

fun numberFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(300)
        emit(i)
    }
}

fun main() = runBlocking {
    numberFlow()
        .filter { it % 2 == 0 }
        .map    { it * 10 }
        .collect { println(it) }
    // 20, 40 が300msおきに表示される
}

flow { } ブロック内で emit() を呼ぶたびに値が下流に流れます。collect は末端の terminal operatorで、Flowを実行開始させます。


StateFlow と SharedFlow

AndroidのViewModelではFlowをUIに公開する際に StateFlowSharedFlow を使います。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class SearchViewModel : ViewModel() {
    private val _query = MutableStateFlow("")
    val query: StateFlow<String> = _query.asStateFlow()

    val results: StateFlow<List<String>> = _query
        .debounce(300)
        .filter  { it.isNotBlank() }
        .flatMapLatest { query -> searchRepository.search(query) }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )

    fun onQueryChange(newQuery: String) {
        _query.value = newQuery
    }
}

debounce(300) でキー入力のたびに検索しないように間引き、flatMapLatest で最新のクエリのみを処理します(古いクエリのFlowは自動的にキャンセル)。


DispatchersとContext

コルーチンがどのスレッドで動くかは Dispatcher で決まります。

Dispatcher用途
Dispatchers.MainUIスレッド(Android)
Dispatchers.IOネットワーク・ファイルI/O
Dispatchers.DefaultCPU集約処理
1
2
3
4
suspend fun loadData() = withContext(Dispatchers.IO) {
    // IOスレッドプールで実行
    api.fetchData()
}

まとめ

概念用途
suspend fun非同期処理の基本単位
async / await並列実行
coroutineScope構造化された並行性
Flow非同期ストリーム
StateFlowUIへの状態公開

コルーチンのポイントは「suspend をつけるだけで非同期になり、書き方は同期コードと変わらない」点です。async/await で並列化し、エラーは通常の try/catch で扱える——この直感性がKotlinの非同期処理を扱いやすくしています。