kotlin flow介绍(1)

时间:2023-02-06 17:14:57

kotlin flow介绍(1)

问题背景

kotlin的日常开发和使用过程中,flow是一个很常见的东西,那么问题来了,flow是什么东东呢? Flow的定义:异步流,概念上讲依然是响应式流。****按顺序发出多个值的数据流,本质上就是一个生产者消费者模型,生产者发送数据给消费者进行消费。flow流的话分为以下两种: 冷流:当执行collect等末端操作符的时候(也就是有消费者的时候),生产者才开始发射数据流。 生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。 热流:不管有没有执行collect等末端操作符(也就是不管有没有消费者),生产者都会发射数据流到内存中。 生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据

问题分析

1、Flow的简单使用

(1)flow{...}内部可以调用suspend函数; (2)通过emit()方法来发射数据; (3)通过collect()方法来收集结果。简单使用代码如下:

fun main() {
    testFlowNormal1()
}

fun testFlowNormal1() {
    runBlocking {
        val flow = flow {
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
        }
        flow.collect {
            println("collect$it")
        }
        println("collect ok")
    }
}

运行结果如下: kotlin flow介绍(1)

2、flow创建的常用方式

(1)flow{...}中通过emit()方法来发送数据; (2)flowOf()一个发射固定值集的流; (3)asFlow()拓展函数,可能将集合和序列转换成流。 示例代码如下:

fun main() {
    testFlowNormal1()
    testFlowNormal2()
    testFlowNormal3()
}

fun testFlowNormal1() {
    runBlocking {
        // 1、flow{...}方式
        val flow = flow {
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
        }
        flow.collect {
            println("collect$it")
        }
        println("collect testFlowNormal1 ok")
    }
}

fun testFlowNormal2() {
    runBlocking {
        // 2、flowOf方式
        val flow = flowOf(2, 3).onEach { delay(1000) }
        flow.collect {
            println("collect$it")
        }
        println("collect testFlowNormal2 ok")
    }
}

fun testFlowNormal3() {
    runBlocking {
        // 3、asFlow方式
        val flow = listOf(4, 5).asFlow().onEach { delay(1000) }
        flow.collect {
            println("collect$it")
        }
        println("collect testFlowNormal3 ok")
    }
}

运行结果如下: kotlin flow介绍(1)

3、Flow是冷流

调用末端流操作符( collect 是其中之一)之前,flow{ ... } 中的代码不会执行。代码如下:

fun main() {
    testFlowCold()
}

fun testFlowCold() {
    runBlocking {
        // 1、flow{...}方式
        val flow = flow {
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
        }
        println("calling collect first...")
        flow.collect {
            println("collect$it")
        }
        println("calling collect second...")
        flow.collect {
            println("collect$it")
        }
    }
}

运行结果如下: kotlin flow介绍(1)

4、末端流操作符

在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符。collect /reduce /fold/toList 等都是末端操作符,示例代码如下:

fun main() {
    testFlowFinalFun()
}

fun testFlowFinalFun() {
    runBlocking {
        // 1、flow{...}方式
        val flow = flow {
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
        }
        // collect方法
        flow.collect {
            println("collect$it")
        }

        // reduce方式
        val reduceSum = flow.reduce { a, b -> a + b }
        println("reduceSum: $reduceSum")

        // fold方式
        val foldNum = flow.fold(100) {
            a, b -> a + b
        }
        println("foldNum $foldNum")

    }
}

运行结果如下: kotlin flow介绍(1)

5、过渡操作符

过渡操作符应用于上游流,并返回下游流。这些操作符也是冷操作符,也就是说如果没有 '被订阅'就不会执行。onStart/catch/onCompletion/map/filter 等都是过渡操作符。示例代码如下:

fun main() {
    testFlowNormal()
}

fun testFlowNormal() {
    GlobalScope.launch {
        flow {
            emit("good")
        }.flowOn(Dispatchers.IO) // 切换线程
            .onStart {
                println("onStart")
            }.catch {
                println("catch:${it.message}") // 有异常会进入此方法
            }.onCompletion {
                println("oniComplete:${it?.message}") // 无论是否有异常都会执行
            }.collect { // 收集流
                println("result = $it")
            }
    }
    Thread.sleep(6000)
}

运行结果如下: kotlin flow介绍(1)

6、和LiveData的区别

flow是kotlin中类似rxJava中flowable的响应流。同样是观察者模式,和livedata的区别在于 1、livedata是生命周期感知的,在整个mvvm架构中适合使用在view层和viewmodel层之间交互,而flow是没有生命周期感知,适合用在model层和viewmodel之间 2、livedata无法处理背压问题,只能显示最新数据,flow和flowable类似可以处理这类问题 3、livedata都放在主线程处理数据,对于线程控制不太理想,flow可以结合协程实现线程切换

7、flow 的背压

什么是背压? 背压的概念:以自然界的水流为例,当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现,对应到Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,背压的场景是一样的。 1、如何处理背压? 先来模拟一个生产者消费者速度不一致的场景,代码如下:

fun main() {
    runBlocking {
        testFlowNormal()
    }
}

suspend fun testFlowNormal() {
    val flow = flow {
        (1..3).forEach {
            delay(1000)
            println("emit $it")
            emit(it)
        }
    }

    val time = measureTimeMillis {
        flow.collect {
            delay(2000)
            println("collect:$it")
        }
    }
    println("use time:${time} ms")
}

运行结果如下: kotlin flow介绍(1) 生产者的速度比消费者的速度快,生产者必须等待消费者消费完毕后才会进行下一次生产。 因此,整个流的耗时=生产者耗时(3 * 1000ms)+消费者耗时(3 * 2000ms)=9s。 显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢? 2、buffer的使用 代码如下:

fun main() {
    runBlocking {
        testFlowNormal()
    }
}

suspend fun testFlowNormal() {
    val flow = flow {
        (1..3).forEach {
            delay(1000)
            println("emit $it")
            emit(it)
        }
    }.buffer(5) // 使用buffer

    val time = measureTimeMillis {
        flow.collect {
            delay(2000)
            println("collect:$it")
        }
    }
    println("use time:${time} ms")
}

运行结果如下: kotlin flow介绍(1) 对比没有buffer情况,大概节省了2秒左右。

问题总结

本文主要介绍了flow相关的部分概念,包括什么是flow、如何创建flow、flow的简单使用介绍,以及背压等概念,有兴趣的同学可以进一步深入研究。