day13 Kotlin coroutine channel操作

尽管大家讲coroutine都会提到channel和flow,然後开始比较,但我个人觉得channel和flow两个你都没有概念的时候,听我比较很难知道我在说什麽啦!!
这边会分段介绍channel的特性,首先,channel就像是一个queue,可以把它想像成羽毛球桶,先进先出(fifo),应该很好懂吧
channel

所以打算先介绍两者,而後再来比较,首先我们先用一下channel吧

val firstChannel = Channel<Int> {  }
val secondChannel = Channel<Int> {  }

lifecycleScope.launch {
    Timber.d("start")

    val waiting = firstChannel.receive()
    Timber.d("check waiting, $waiting")
    val waiting2 = secondChannel.receive()
    Timber.d("check waiting2, $waiting2")

    Timber.d("after clair receive")

    val result = async {
        Timber.d("in result")
        sampleDemoSuspendFunction("$waiting result")
    }
    val result2 = async {
        Timber.d("in result 2 ")
        sampleDemoSuspendFunction("$waiting2 result2")
    }
    Timber.d("${result.await()} , ${result2.await()}")
}

lifecycleScope.launch(Dispatchers.IO){
    delay(1000)
    firstChannel.send(1001)
    Timber.d("sent first")

    delay(1000)
    secondChannel.send(1111 )
    Timber.d("sent second")
}


suspend fun sampleDemoSuspendFunction(content: String): String {
    delay(1000L)
    return content
}

/**
 *
 hh:mm:ss 
 14:44:35.792 17905-17967/: sent first
 14:44:35.792 17905-17905/: check waiting, 1001
 14:44:36.793 17905-17967/: sent second
 14:44:36.793 17905-17905/: check waiting, 1111
 14:44:36.794 17905-17905/: after clair receive
 
 14:44:36.794 17905-17905/$1$1$result: in result
 14:44:36.795 17905-17905/d$1$1$result2: in result 2 
 
 14:44:37.798 17905-17905/: 1001 result , 1111 result2
 * */

可以看到channel的关键字有send和receive两个,在Timber里面,我们要印出waiting和waiting2两个变数,这就表示我们需要两个变数都已经有值了,才去呼叫Timber

我已经把log都印出来了,从log可以看到先送出东西到channel,receive()才会被触发,现在我们来讲讲send和receive的特性,receive很好懂,没东西的时候就suspend起来,等到有东西为止,但send就不一样了,send的时候如果没有被receive,他也suspend在那边等,等到有人收他东西,那如果没人收呢,就没有然後了!!

比如上面把两个receive交换,secondChannel再等一个等不到的人,因为firstChannel没有被接收,他suspend在那里了

这个特性非常重要,如果把send和receive放在同一个thread,就会suspend起来,然後又没有然後了QQ,用的位子百百种,使用时记得思考suspend的特性,send/receive有没有对起来,规则告诉你了,剩下就让各位发挥

lifecycleScope.launch { 
    launch {
        for (x in 1..5) firstChannel.send(x)
    }

    repeat(5) { Timber.d(firstChannel.receive().toString()) }
    Timber.d("Done!")
}

channel cancel

和corotuine的两段式取消有点类似,当呼叫cancel()时,并不会立刻关闭,而是会等到先前发出的值都被接收後才会关闭,值得注意的一点,他有分send和receive,cancel()呼叫後不能send

操作符

channel的建构器可以用produce,而 for 在接收端可以用consumeEach有限制的替换,先看下面范例

lifecycleScope.launch {
    val produceChannel = produce {
        for (x in 1..5) send( x )
    }
    produceChannel.consumeEach { Timber.d(it.toString()) }
}

上面的例子应该表现出channel的基本特性了,但这并没有解决coroutine之间沟通的问题,channel的特性在於可以再多个coroutine之间 send和receive

这里再借文档例子

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all

/**
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
 * */

给看不懂的人在解释一下,多核心的时候,可以同时进行多个工作,用channel可以保证,变数的顺序和更新
channel

那刚刚说的consumeEach的限制,就是for在其中一个coroutine失败时,可以安全的持续接收值直到结束,但comsumeEach在遇到,会无法接收到完整的资料

val firstChannel = Channel<Int>(10)//buffer 有10个


lifecycleScope.launch {
    (1..10).forEach { firstChannel.send(it) }

    val one = async { launchProcessor(1,firstChannel) }
    val two = async { launchProcessor(2,firstChannel) }
    val three = async { launchProcessor(3,firstChannel) }

    one.await()
    two.await()
    three.await()

    firstChannel.cancel() // cancel producer coroutine and thus kill them all
}

with for loop,这里会少3是因为在for loop里面,已经把3拿出来了,才 throw Exception,所以在channel里面才会没有3

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {

    for ( msg in channel) {
        if (msg == 3) throw CancellationException()
        Timber.d("Processor #$id received $msg")
        delay(500)
    }
}


/**
 * 
 Processor #1 received 1
 Processor #2 received 2
 Processor #1 received 4
 Processor #2 received 5
 Processor #1 received 6
 Processor #2 received 7
 Processor #1 received 8
 Processor #2 received 9
 Processor #1 received 10
 * */

with consume

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {

    channel.consumeEach { msg ->
        if (msg == 3) throw CancellationException()
        Timber.d("Processor #$id received $msg")
        delay(500)
    }
}
/**
 * Processor #1 received 1
 * Processor #2 received 2
 * */

最後,channel会尽可能的公平操作,ex.有两个先receive之後再send的coroutine,他们会交替的被呼叫,而不会在一个里面重复,文档有范例,这边有例外讨论

实际运作比较

在看官方影片的时候,刚好看到他们有做比较,真是太贴心了
只用list坐回传的话顺序是

用channel回传的顺序是交替的,你会依顺序一个一个收到,而不是一次收到一串

特性

如果没被receive,在scope终止前,会一直等下去,不会有"30岁还单身我们结婚吧?" 这种事情

连结统整

必看

KotlinConf 2019 前面有讲到channel
englidh doc
chinese doc

图片来源

kotlinlang


<<:  善用 Linux man-pages

>>:  Day 28 利用transformer自己实作一个翻译程序(十) Encoder layer

30.移转 Aras PLM大小事-结语

最後第30天,有点写不出什麽了,来说一下感想 写这个系列文主要纪录我从无到有把系统导入过程中会发生的...

Ruby幼幼班--Factorial Trailing Zeroes

9月快到了,要开始准备一些资料,凑30天用,所以除非有一篇Rails幼幼班的资料,不然不会单独分享...

Day30:【技术篇】架设网站的基本知识

一、前言   昨天发文後,马上收到系统罐头通知,终於熬到这一天了(我好兴奋啊啊啊!)终於要完成人生首...

【Day 04】String Methods

前言 今天要来介绍 string Methods,可以把 string 进行各种处理,来做出你想做的...

Get Support for Frozen FireStick Issue Dial 1877-943-5444

Customers can take the membership on a month to mo...