Day16:四种不同的 Channel

在上一篇文章我们建立 Channel 时,使用 Channel<E>() 来建立一个 Channel。这个方法是由 Coroutine 所提供的一个用来建立 Channel 的函式。我们来看一下这个函式的签名:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

在这边有三个参数,capacity、onBufferOverflow、onUndeliveredElement,当我们没有带任何参数进来的时候,预设是会使用

capacity: Int = RENDEZVOUS

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

onUndeliveredElement: ((E) -> Unit)? = null

依据带入的参数不同,Channel 的特性也不一样,下面将列出所有的参数:


Capacity

这边的 Capacity 指的是缓冲区的容量,Channel 提供了五种 capacity:

  • RENDEZVOUS
  • CONFLATED
  • UNLIMITED
  • BUFFERED
  • 其他

RENDEZVOUS Channel

预设的 Capacity,这是一种没有 Buffer 的 channel,经由 channel 传送的元素会在 send() 以及 receive() 同时呼叫时,才会传送过去。当 receive() 调用时, send() 就会 suspend 。反之,当 send() 调用时, receive() 就会 suspend 。换句话说, send() 与 receive() 是一组一组成双成对的。

另外,假设调用 send() 但是 receive() 却没有调用,这时候这个 suspend 函式就不会被调用,反之,如果只有调用 receive() 没有调用 send() ,这时候因为 channel 是空的, 调用 receive() 没有内容,所以 receive() 就会进入 suspend 的状态,直到有一个 send() 被调用。

RENDEZVOUS -> {
      if (onBufferOverflow == BufferOverflow.SUSPEND)
          RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
      else
          ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
  }

Channel() 这个 Builder 里面的内容来看,我们发现,使用 RENDEZVOUS 时,必须要将 onBufferOverflow 设定为 BufferOverflow.SUSPEND ,此时才是会建立 RendezvousChannel() ,否则就会建立一个 ArrayChannel() 且 capacity 为 1。(Channel() 预设的 onBufferOverflow 的值就是设定为 BufferOverflow.SUSPEND。)

CONFLATED Channel

Conflated channel (合并的通道),它在使用上并不是将所有的元素都合并起来,而是只保留最後一个元素,换句话说,在这个 channel 中,它的 Buffer 只有一个,而这个 Buffer 只会把最新的元素保留下来,在下一个 send() 调用之前如果没有调用 receive() ,那麽在 Buffer 里面的元素将会被丢掉。

看下面的范例:

class Day16 {
    val scope = CoroutineScope(Job())

    suspend fun conflatedBroadcastChannel(): Unit {
        val channel = Channel<Int>(capacity = Channel.CONFLATED)
        scope.launch {
            for(i in 0..10){
                channel.send(i)
            }
            
            delay(1000)
            channel.send(9)
        }
        
        for(i in 0 .. 100){
            delay(100)
            println(channel.receive())
        }
        
        println("done")
    }
}

→ 我们在 scope.launch{ ... } 中发送 Int 至 channel 中,其中发送的部分有两段,一段是在 channel 中直接塞入0~10,接着过了一秒钟,另外调用 channel.send(9) 将整数 9 传进 channel 中。

→ 在 scope.launch{} 外侧则是使用一个 for-loop 来取用 channel 里面的值,我们猜猜看会是怎麽样的结果呢?

10
9

如我们所说的, Conflated channel 只会保留最後的元素,直到有另外的 receive() 来把值取走。所以上面范例的第一段 channel.send() 因为发送的时间比取值的时间还要快,所以所有的值都以会新的盖掉旧的行为进行,应为第一个 channel.receive() 呼叫时,已经过了 100 毫秒。

我在这边刻意将含有 channel.receive() 的 for-loop 时间拉长,为了就是要包含到下一个 send() 的时间,以上面这个范例来说, 第一段的 channel.send() 做完之後,延迟了 1 秒钟,接着会调用 channel.send(9) 把整数 9 送出,这个行为刚好会在 channel.receive() 这个 for-loop 的范围内,所以 9 也会被列印出来。

之後呢?因为我们使用 for-loop 尝试调用 channel.receive() 取出 100 个元素,但是我们只有成功取出两个元素,所以这个 receive() 就会持续等待,直到所有的 receive() 都取出值。

  • 有趣的点:
CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedChannel(onUndeliveredElement)
        }

使用 Channel() 时,会要求你的 onBufferOverflow 要为 BufferOverflow.SUSPEND ,但是进入 CONFLATED 的定义去看,又会发现

Requests a conflated channel in the Channel(...) factory function. This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.

其实它是建立一个使用 onBufferOverflow = DROP_OLDEST 的 channel。


UNLIMITED channel

Unlimited channel 应该比较好懂一点,从名字看起来就知道它的 Buffer 是无限制的。

UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows

从 Channel() 得知,UNLIMITED 会建立 LinkedListChannel ,LinkedList 我想应该大家都知道意思,它就是一个有顺序性的列表。那麽 LinkedListChannel 就是一个有顺序性的列表含有一个无限制的 Buffer。它的实作如下:

internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
    protected final override val isBufferAlwaysEmpty: Boolean get() = true
    protected final override val isBufferEmpty: Boolean get() = true
    protected final override val isBufferAlwaysFull: Boolean get() = false
    protected final override val isBufferFull: Boolean get() = false
...

可以发现 isBufferAlwaysFull 以及 isBufferFull 都是回传 false,也就是都不会满,那麽也就不会走到 onBufferOverflow 的情况了。(不过这边的无限制的 unlimited 的 buffer 最终还是需要看记忆体够不够放)

class Day16 {
    val scope = CoroutineScope(Job())
    suspend fun conflatedChannel(): Unit {
        val channel = Channel<Int>(capacity = Channel.UNLIMITED)
        scope.launch {
            for (i in 0..10) {
                channel.send(threeTimesInt(i))
            }
        }

        for (i in 0..10) {
            println(channel.receive())
        }
        println("done")
    }

    private suspend fun threeTimesInt(x: Int): Int {
        delay(100L * x)
        return x * 3
    }
}
0
3
6
9
12
15
18
21
24
27
30
done

我在使用 UNLIMITED 以及预设的 RENDEZVOUS 时感觉很相像,但是差异在於 RENDEZVOUS 所产生出来的 Channel 是由 RendezvousChannel(onUndeliveredElement) 所建立的。而RendezvousChannel(onUndeliveredElement) 的实作如下:

internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?): AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get()= true
    protected final override val isBufferEmpty: Boolean get()= true
    protected final override val isBufferAlwaysFull: Boolean get()= true
    protected final override val isBufferFull: Boolean get()= true
}

我们可以发现,isBufferAlwaysFull 以及 isBufferFull 都是回传 true,所以预设的 RENDEZVOUS channel 是会走到 onBufferOverflow 的情况,与 UNLIMITED channel 不同。

在 UNLIMITED channel 下调用 send() 是永远不会被 suspend 的,而 RENDEZVOUS channel 是会 suspend。我们假设将上面这段程序稍作修改

class Day16 {
    val scope = CoroutineScope(Job())
    suspend fun conflatedChannel(): Unit {
        val channel = Channel<Int>(capacity = Channel.UNLIMITED)
        scope.launch {
            for (i in 0..10) {
								println("first: $i")
                channel.send(threeTimesInt(i))
            }
        }

				scope.launch {
            for (i in 0..10) {
                println("second: $i")
                channel.send(fiveTimesInt(i))
            }
        }

				delay(100)
        for (i in 0..20) {
            println(channel.receive())
        }

        println("done")
    }

    private suspend fun threeTimesInt(x: Int): Int {
        return x * 3
    }
		private suspend fun fiveTimesInt(x: Int): Int {
        return x * 5
    }

}
// capacity = Channel.RENDEZVOUS
first: 0
second: 0
0
first: 1
0
second: 1
first: 2
3
5
second: 2
first: 3
6
10
second: 3
first: 4
9
15
12
second: 4
first: 5
second: 5
20
15
first: 6
second: 6
25
18
30
first: 7
second: 7
first: 8
21
35
second: 8
first: 9
24
40
second: 9
first: 10
27
45
second: 10
30
done

→ 因为 send() 是会 suspend 的,所以会在不同的 coroutine scope 中切换。

如果使用的是 UNLIMITED channel ,其结果会是:

first: 0
first: 1
first: 2
first: 3
first: 4
first: 5
first: 6
first: 7
first: 8
first: 9
first: 10
second: 0
second: 1
second: 2
second: 3
second: 4
second: 5
second: 6
second: 7
second: 8
second: 9
second: 10
0
3
6
9
12
15
18
21
24
27
30
0
5
10
15
20
25
30
35
40
45
done

→ 因为 Buffer 有无限的空间,所以可以存完所有资料之後才列印出来。


最後则是 Buffer 的容量不为 0 也不是无限的

在前面我们知道,当 Buffer 的容量为 0 时(RENDEZVOUS),每一次的调用 send() ,send() 都会 suspend ,也就是说会把执行绪的使用权切换走,直到调用 receive()

在 Buffer 不为 0 的 channel 中, send() 只会在 channel 满的时候才会 suspend,也就是说会根据我们所设置的大小来决定什麽时机点会 suspend send()。同样地,receive() 也是会在 buffer 为空的时候会 suspend 。

在 Channel() 中,我们注意到当 capacity ==1 且 onBufferOverflow == BufferOverflow.DROP_OLDEST 时,我们就会建立 ConflatedChannel() ,否则就会建立 ArrayChannel()

if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
          ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
      else
          ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)

在 ArrayChannel 的里面,会根据带入的 capacity 与现在的状态来比对 buffer 是否为满的状态。

internal open class ArrayChannel<E>(
    /**
     * Buffer capacity.
     */
    private val capacity: Int,
    private val onBufferOverflow: BufferOverflow,
    onUndeliveredElement: OnUndeliveredElement<E>?
){
...
		protected final override val isBufferAlwaysEmpty: Boolean get() = false
    protected final override val isBufferEmpty: Boolean get() = state.size == 0
    protected final override val isBufferAlwaysFull: Boolean get() = false
    protected final override val isBufferFull: Boolean get() = state.size == capacity
...

小结

有四种 Channel ,我们可以依照需求来选择使用,特别要注意的是, receive() 的行为是固定的,receive() 如果在 channel 为空的时候调用,那麽 receive() 就会 suspend 直到 channel 不为空。

而 send() 则是会根据不同 channel 的 buffer 容量大小来决定何时会 suspend。UNLIMITED 因为具有无限大的 buffer 大小,所以 send() 不会 suspend()。

预设是使用 RENDEZVOUS Channel ,也就是 buffer 的 容量为 0,所以 send() 与 receive() 必须要成双成对的呼叫,否则就会 suspend。

参考资料

[Kotlin Coroutine Channel] (https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html)

特别感谢

Kotlin Taiwan User Group
Kotlin 读书会


<<:  TypeScript 能手养成之旅 Day 5 原始型别

>>:  [Day 06] 从简单的Todolist 了解 iOS开发的大致流程

Day05 - 随意玩之 OrderCreate API

在昨天我们度过最大难关加密了,之後应该会轻松许多吧? API 呼叫流程如下 步骤 1, 2, 3 目...

今年我想陪着 30 天之 27

1309. Decrypt String from Alphabet to Integer Mapp...

第29车厢-倒数一篇!人人有奖~抽奖抽起来了各位!

本篇分享自制一个简易抽奖程序码啦!---以JQ为例 ▼完成图如下 为了让大家比较好懂,程序码用这张...

Day26 深入解析Elasticsearch Query DSL Fuzzy query

Hello大家~ 昨天有去看烟火吗? 个人很怕烟火声都是看别人拍好的然後静音观看XD 在之前的内容我...

Day13-旧网站重写成Vue_4_TAB页签式选单

今天把INTRO的框架做出来 首先先把之前旧网站的介绍板块整理成json档 然後要把点击标题切换内容...