在上一篇文章我们建立 Channel 时,使用 Channel<E>()
来建立一个 Channel。这个方法是由 Coroutine 所提供的一个用来建立 Channel 的函式。我们来看一下这个函式的签名:
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
在这边有三个参数,capacity、onBufferOverflow、onUndeliveredElement,当我们没有带任何参数进来的时候,预设是会使用
capacity: Int = RENDEZVOUS
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
onUndeliveredElement: ((E) -> Unit)? = null
依据带入的参数不同,Channel 的特性也不一样,下面将列出所有的参数:
这边的 Capacity 指的是缓冲区的容量,Channel 提供了五种 capacity:
预设的 Capacity,这是一种没有 Buffer 的 channel,经由 channel 传送的元素会在 send()
以及 receive()
同时呼叫时,才会传送过去。当 receive()
调用时, send()
就会 suspend 。反之,当 send()
调用时, receive()
就会 suspend 。换句话说, send() 与 receive() 是一组一组成双成对的。
另外,假设调用 send()
但是 receive()
却没有调用,这时候这个 suspend 函式就不会被调用,反之,如果只有调用 receive()
没有调用 send()
,这时候因为 channel 是空的, 调用 receive() 没有内容,所以 receive()
就会进入 suspend 的状态,直到有一个 send()
被调用。
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
从 Channel()
这个 Builder 里面的内容来看,我们发现,使用 RENDEZVOUS
时,必须要将 onBufferOverflow
设定为 BufferOverflow.SUSPEND
,此时才是会建立 RendezvousChannel()
,否则就会建立一个 ArrayChannel()
且 capacity 为 1。(Channel() 预设的 onBufferOverflow 的值就是设定为 BufferOverflow.SUSPEND。)
Conflated channel (合并的通道),它在使用上并不是将所有的元素都合并起来,而是只保留最後一个元素,换句话说,在这个 channel 中,它的 Buffer 只有一个,而这个 Buffer 只会把最新的元素保留下来,在下一个 send()
调用之前如果没有调用 receive()
,那麽在 Buffer 里面的元素将会被丢掉。
看下面的范例:
class Day16 {
val scope = CoroutineScope(Job())
suspend fun conflatedBroadcastChannel(): Unit {
val channel = Channel<Int>(capacity = Channel.CONFLATED)
scope.launch {
for(i in 0..10){
channel.send(i)
}
delay(1000)
channel.send(9)
}
for(i in 0 .. 100){
delay(100)
println(channel.receive())
}
println("done")
}
}
→ 我们在 scope.launch{ ... }
中发送 Int 至 channel 中,其中发送的部分有两段,一段是在 channel 中直接塞入0~10,接着过了一秒钟,另外调用 channel.send(9)
将整数 9 传进 channel 中。
→ 在 scope.launch{} 外侧则是使用一个 for-loop 来取用 channel 里面的值,我们猜猜看会是怎麽样的结果呢?
10
9
如我们所说的, Conflated channel 只会保留最後的元素,直到有另外的 receive()
来把值取走。所以上面范例的第一段 channel.send() 因为发送的时间比取值的时间还要快,所以所有的值都以会新的盖掉旧的行为进行,应为第一个 channel.receive() 呼叫时,已经过了 100 毫秒。
我在这边刻意将含有 channel.receive() 的 for-loop 时间拉长,为了就是要包含到下一个 send()
的时间,以上面这个范例来说, 第一段的 channel.send() 做完之後,延迟了 1 秒钟,接着会调用 channel.send(9)
把整数 9 送出,这个行为刚好会在 channel.receive() 这个 for-loop 的范围内,所以 9 也会被列印出来。
之後呢?因为我们使用 for-loop 尝试调用 channel.receive() 取出 100 个元素,但是我们只有成功取出两个元素,所以这个 receive() 就会持续等待,直到所有的 receive() 都取出值。
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedChannel(onUndeliveredElement)
}
使用 Channel() 时,会要求你的 onBufferOverflow 要为 BufferOverflow.SUSPEND
,但是进入 CONFLATED
的定义去看,又会发现
Requests a conflated channel in the Channel(...) factory function. This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.
其实它是建立一个使用 onBufferOverflow = DROP_OLDEST 的 channel。
Unlimited channel 应该比较好懂一点,从名字看起来就知道它的 Buffer 是无限制的。
UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
从 Channel() 得知,UNLIMITED 会建立 LinkedListChannel
,LinkedList 我想应该大家都知道意思,它就是一个有顺序性的列表。那麽 LinkedListChannel 就是一个有顺序性的列表含有一个无限制的 Buffer。它的实作如下:
internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get() = true
protected final override val isBufferEmpty: Boolean get() = true
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = false
...
可以发现 isBufferAlwaysFull
以及 isBufferFull
都是回传 false,也就是都不会满,那麽也就不会走到 onBufferOverflow 的情况了。(不过这边的无限制的 unlimited 的 buffer 最终还是需要看记忆体够不够放)
class Day16 {
val scope = CoroutineScope(Job())
suspend fun conflatedChannel(): Unit {
val channel = Channel<Int>(capacity = Channel.UNLIMITED)
scope.launch {
for (i in 0..10) {
channel.send(threeTimesInt(i))
}
}
for (i in 0..10) {
println(channel.receive())
}
println("done")
}
private suspend fun threeTimesInt(x: Int): Int {
delay(100L * x)
return x * 3
}
}
0
3
6
9
12
15
18
21
24
27
30
done
我在使用 UNLIMITED 以及预设的 RENDEZVOUS 时感觉很相像,但是差异在於 RENDEZVOUS 所产生出来的 Channel 是由 RendezvousChannel(onUndeliveredElement)
所建立的。而RendezvousChannel(onUndeliveredElement) 的实作如下:
internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?): AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get()= true
protected final override val isBufferEmpty: Boolean get()= true
protected final override val isBufferAlwaysFull: Boolean get()= true
protected final override val isBufferFull: Boolean get()= true
}
我们可以发现,isBufferAlwaysFull
以及 isBufferFull
都是回传 true,所以预设的 RENDEZVOUS channel 是会走到 onBufferOverflow 的情况,与 UNLIMITED channel 不同。
在 UNLIMITED channel 下调用 send()
是永远不会被 suspend 的,而 RENDEZVOUS channel 是会 suspend。我们假设将上面这段程序稍作修改
class Day16 {
val scope = CoroutineScope(Job())
suspend fun conflatedChannel(): Unit {
val channel = Channel<Int>(capacity = Channel.UNLIMITED)
scope.launch {
for (i in 0..10) {
println("first: $i")
channel.send(threeTimesInt(i))
}
}
scope.launch {
for (i in 0..10) {
println("second: $i")
channel.send(fiveTimesInt(i))
}
}
delay(100)
for (i in 0..20) {
println(channel.receive())
}
println("done")
}
private suspend fun threeTimesInt(x: Int): Int {
return x * 3
}
private suspend fun fiveTimesInt(x: Int): Int {
return x * 5
}
}
// capacity = Channel.RENDEZVOUS
first: 0
second: 0
0
first: 1
0
second: 1
first: 2
3
5
second: 2
first: 3
6
10
second: 3
first: 4
9
15
12
second: 4
first: 5
second: 5
20
15
first: 6
second: 6
25
18
30
first: 7
second: 7
first: 8
21
35
second: 8
first: 9
24
40
second: 9
first: 10
27
45
second: 10
30
done
→ 因为 send()
是会 suspend 的,所以会在不同的 coroutine scope 中切换。
如果使用的是 UNLIMITED channel ,其结果会是:
first: 0
first: 1
first: 2
first: 3
first: 4
first: 5
first: 6
first: 7
first: 8
first: 9
first: 10
second: 0
second: 1
second: 2
second: 3
second: 4
second: 5
second: 6
second: 7
second: 8
second: 9
second: 10
0
3
6
9
12
15
18
21
24
27
30
0
5
10
15
20
25
30
35
40
45
done
→ 因为 Buffer 有无限的空间,所以可以存完所有资料之後才列印出来。
在前面我们知道,当 Buffer 的容量为 0 时(RENDEZVOUS),每一次的调用 send()
,send() 都会 suspend ,也就是说会把执行绪的使用权切换走,直到调用 receive()
。
在 Buffer 不为 0 的 channel 中, send() 只会在 channel 满的时候才会 suspend,也就是说会根据我们所设置的大小来决定什麽时机点会 suspend send()。同样地,receive() 也是会在 buffer 为空的时候会 suspend 。
在 Channel() 中,我们注意到当 capacity ==1 且 onBufferOverflow == BufferOverflow.DROP_OLDEST 时,我们就会建立 ConflatedChannel()
,否则就会建立 ArrayChannel()
。
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
在 ArrayChannel 的里面,会根据带入的 capacity 与现在的状态来比对 buffer 是否为满的状态。
internal open class ArrayChannel<E>(
/**
* Buffer capacity.
*/
private val capacity: Int,
private val onBufferOverflow: BufferOverflow,
onUndeliveredElement: OnUndeliveredElement<E>?
){
...
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = state.size == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = state.size == capacity
...
有四种 Channel ,我们可以依照需求来选择使用,特别要注意的是, receive() 的行为是固定的,receive() 如果在 channel 为空的时候调用,那麽 receive() 就会 suspend 直到 channel 不为空。
而 send() 则是会根据不同 channel 的 buffer 容量大小来决定何时会 suspend。UNLIMITED 因为具有无限大的 buffer 大小,所以 send() 不会 suspend()。
预设是使用 RENDEZVOUS Channel ,也就是 buffer 的 容量为 0,所以 send() 与 receive() 必须要成双成对的呼叫,否则就会 suspend。
[Kotlin Coroutine Channel] (https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html)
Kotlin Taiwan User Group
Kotlin 读书会
<<: TypeScript 能手养成之旅 Day 5 原始型别
>>: [Day 06] 从简单的Todolist 了解 iOS开发的大致流程
在昨天我们度过最大难关加密了,之後应该会轻松许多吧? API 呼叫流程如下 步骤 1, 2, 3 目...
1309. Decrypt String from Alphabet to Integer Mapp...
本篇分享自制一个简易抽奖程序码啦!---以JQ为例 ▼完成图如下 为了让大家比较好懂,程序码用这张...
Hello大家~ 昨天有去看烟火吗? 个人很怕烟火声都是看别人拍好的然後静音观看XD 在之前的内容我...
今天把INTRO的框架做出来 首先先把之前旧网站的介绍板块整理成json档 然後要把点击标题切换内容...