Understanding Kotlin Coroutines Flow Internals: How map and filter Work
Japanese version: This article is also available in Japanese on Zenn.
In the previous article, I decoded the mechanisms of Flow Builder (flow), emit, and collect in a sample Kotlin Coroutines Flow code like the one below, by reading through the kotlinx.coroutines library source code. If you haven’t read it yet, I recommend checking it out.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
Quick Recap
In the previous article, we clarified the mechanisms and relationships of Flow Builder (flow), emit, and collect.
- Flow Builder (
flow) creates an instance ofSafeFlow.SafeFlowholds theblock(a lambda withFlowCollectoras receiver) passed toflow. - When
collectis called onSafeFlow, theblockis executed with theFlowCollectorpassed tocollectas its receiver. - Inside
block, theemitfunction defined inFlowCollectoris called. Note that when a lambda is passed tocollect, that lambda is treated as theemitfunction via SAM conversion.
In other words, every time emit is called inside flow {}, the lambda passed to collect is executed.
Flow Builder (flow), emit, and collect — mechanisms and relationships
Note: If the above explanation is unclear, the previous article covers it in more detail.
In this article, as an extension of the previous one, we will decode the behind-the-scenes mechanism when Intermediate Operators such as map and filter are present.
fun simple(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
fun Int.isEven() = this % 2 == 0
fun main() = runBlocking<Unit> {
simple()
.filter { it.isEven() } // 2, 4
.map { it * 2 } // 4, 8
.collect { value -> println(value) }
}
Internal Implementation of map
Let’s look at map, a representative example of an Intermediate Operator.
map is a function that applies some transformation to each value in a Flow. The transformation is defined by the lambda passed to map. The source code for map is shown below.
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
kotlinx.coroutines — Transform.kt L49-L51
The return type is Flow<R>, which tells us that each element’s type is converted from T to R.
It also takes crossinline transform: suspend (value: T) -> R as an argument — this is the lambda that defines the T-to-R transformation.
Here things get slightly confusing, because there are two different things called transform:
- The argument lambda representing the transformation logic (
suspend (value: T) -> R) - The
Flow<T>.unsafeTransformextension function onFlow, imported asimport kotlinx.coroutines.flow.unsafeTransform as transform
The two different transforms
Inside map, the argument transform is called within the lambda passed to unsafeTransform (the second transform). The source code for unsafeTransform is shown below.
@PublishedApi
internal inline fun <T, R> Flow<T>.unsafeTransform(
crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
collect { value ->
transform(value)
}
}
kotlinx.coroutines — Emitters.kt L42-L49
A function called unsafeFlow is used here. Like flow, this is a type of Flow Builder. Looking at the source code for unsafeFlow, it returns an anonymous object conforming to the Flow<T> interface.
internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
kotlinx.coroutines — SafeCollector.common.kt L103-L110
Note: The
unsafeprefix is used because, unlikeflow, the runtime check that verifies the execution context is correct is omitted.
Since this is getting complex, let me rewrite the map function with unsafeTransform and unsafeFlow expanded for clarity:
// Original implementation
// public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
// return@transform emit(transform(value))
// }
public inline fun <T, R> Flow<T>.map(
crossinline transform: suspend (value: T) -> R
): Flow<R> = object : Flow<R> {
override suspend fun collect(collector: FlowCollector<R>) {
collector.run {
this@map.collect { value ->
emit(transform(value))
}
}
}
}
Let me walk through the expanded implementation step by step. First, map creates and returns a new anonymous object of type Flow<R>.
Creating the Flow<R> anonymous object
Inside the collect function of the Flow<R> returned by map, the collect of the upstream Flow<T> (i.e., the receiver of map) is called first.
Calling collect on Flow<T>
From Flow<T>’s collect, the lambda (block) passed to the Flow Builder (flow) is called. When emit is called inside that lambda, the value is first transformed by transform, and then emit on the FlowCollector<R> passed to Flow<R>’s collect is called.
Applying transform and calling emit on FlowCollector<R>
Let’s now walk through the overall flow using the following sample code:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.map { "${it * 2}!" } // "2!", "4!", "6!"
.collect { value -> println(value) }
}
First, flow creates a SafeFlow<Int>.
Creating SafeFlow<Int>
Next, map creates a Flow<String>.
Creating Flow<String>
collect is called on Flow<String>, which in turn calls collect on Flow<Int>.
Flow<String>'s collect → Flow<Int>'s collect
From Flow<Int>’s collect, the lambda (block) passed to the original flow is executed. When emit is called inside block, the lambda passed to Flow<Int>’s collect (i.e., FlowCollector<Int>’s emit) is called.
Executing flow's lambda and calling FlowCollector<Int>'s emit
Finally, after transform is applied to the upstream value (value), FlowCollector<String>’s emit (= the lambda passed to the terminal collect) is called.
Applying transform and calling FlowCollector<String>'s emit
To summarize the execution order: Flow<String>’s collect → Flow<Int>’s collect → FlowCollector<Int>’s emit → FlowCollector<String>’s emit. The same pattern applies when multiple Intermediate Operators are present.
We have now decoded the internal mechanism of map from its source code. The three key points are:
- Each time
mapis applied, a newFlow<R>is created. - When the terminal
.collectis called,collectis called sequentially from downstream to upstream. Then, each time a value isemitted inside the originalflow {},emitonFlowCollectoris called sequentially from upstream to downstream, ultimately calling the lambda passed to the terminalcollect. - Just before passing a value to the downstream
FlowCollector’semit, the value is transformed bytransform.
Internal Implementation of filter
As another example, let’s look at the internal implementation of filter. Looking at its source code, we can see it is implemented using unsafeTransform, just like map.
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
kotlinx.coroutines — Transform.kt L17-L19
As with map, here is the expanded code with unsafeTransform inlined for clarity.
The difference from map is that instead of emit(transform(value)), it uses if (predicate(value)) emit(value) — only emitting values to the downstream FlowCollector<T> that satisfy the condition lambda (predicate).
public inline fun <T> Flow<T>.filter(
crossinline predicate: suspend (T) -> Boolean
): Flow<T> = object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.run {
this@filter.collect { value ->
if (predicate(value)) emit(value) // 💡 Difference from `map`
}
}
}
}
Summary
In this article, we decoded the mechanisms of two fundamental Intermediate Operators — map and filter — from the source code. The three key points are:
- Each time
maporfilteris applied, a newFlowis created. - When the terminal
.collectis called,collectis called sequentially from downstream to upstream. Then, each time a value isemitted inside the originalflow {},emitonFlowCollectoris called sequentially from upstream to downstream, ultimately calling the lambda passed to the terminalcollect. - Just before passing a value to the downstream
FlowCollector’semit,mapapplies a transformation, whilefilterperforms a conditional branch to decide whether toemitthe value.
That said, there are many other types of Intermediate Operators:
- More advanced operations like
takeanddebounce - Buffering operators like
bufferandconflate - Context-switching operators like
flowOn - Error-handling operators like
catch
I plan to cover other Intermediate Operators in future articles.