泛函编程(35)-泛函Stream IO:IO处理过程-IO Process

时间:2022-10-31 04:50:02

    IO处理可以说是计算机技术的核心。不是吗?使用计算机的目的就是希望它对输入数据进行运算后向我们输出计算结果。所谓Stream IO简单来说就是对一串按序相同类型的输入数据进行处理后输出计算结果。输入数据源可能是一串键盘字符、鼠标位置坐标、文件字符行、数据库纪录等。如何实现泛函模式的Stream IO处理则是泛函编程不可或缺的技术。

首先,我们先看一段较熟悉的IO程序:

 1 import java.io._
2 def linesGt4k(fileName: String): IO[Boolean] = IO {
3 val src = io.Source.fromFile(fileName)
4 try {
5 var count = 0
6 val lines: Iterator[String] = src.getLines
7 while (count <= 4000 && lines.hasNext) {
8 lines.next
9 count += 1
10 }
11 count > 4000
12 } finally src.close
13 } //> linesGt4k: (fileName: String)fpinscala.iomonad.IO[Boolean]

 

以上例子里有几项是值得提倡的:使用完文件后及时关闭,防止资源流露、没有一次性将整个文件载入内存而是逐行读取文件内容,节省内存资源。虽然整个过程是包嵌在IO类型内,但操作代码直接产生副作用。很明显,起码IO处理过程是由非纯代码组成的,无法实现函数组合,既是无法实现泛函编程的通过重复使用组件灵活组合功能的特点了。可以相像,我们在泛函Stream IO编程中将会通过许多细小组件的各式组合来实现多样性的IO计算功能。

实际上我们想使用以下模式的表达式:

 1 object examples {
2 //假设我们已经获取了这个Stream[String]
3 val lines: Stream[String] = sys.error("defined elsewhere!")
4 //无论40k或者其它数量都很容易得取。只要换个数字就行了
5 val lgt40k = lines.zipWithIndex.exists(_._2 + 1 >= 40000)
6 //把空行过滤掉
7 val lgt40k2 = lines.filter(! _.trim.isEmpty).zipWithIndex.exists(_._2 + 1 >= 40000)
8 //在40k行内检查是否存在连续11行第一个字母组合为abracadabra
9 val lgt40k3 = lines.take(40000).map(_.head).indexOfSlice("abracadabra".toList)
10 }

以上代码充分显示了我们所追求的泛函编程模式:简洁、灵活、优雅。

不过,这个Stream[String]就不是表面上那么容易得到的了。我们先把它放一放。

我们现在可以先分析一下泛函Stream IO编程原理。泛函编程的精髓就是把一个程序分解成许多纯代码组件,然后通过各种搭配组合来实现程序整体功能。那么对于Stream IO计算我们希望能先实现那些纯代码的基本组件然后把它们组合起来。我们可以把Stream IO处理过程想象成连成一串的电视信号处理盒子:每个盒子具备一项信号转换或者过滤功能,我们将一个盒子的输出端子接入另一个盒子的输入端子使信号可以经过一连串的处理过程最终输出我们要求的信号。我们可以用一个IO处理过程代表一个信号转换盒子。它的款式是这样的;Process[I,O]。最终的IO程序就是一连串Process[I,O]。当然,第一个Process[I,O]的输入端必须连接一个Stream,而最后一个则接在一个实体的设备。我们先不管这两头,先从Process[I,O]的功能着手,使其能够连成一串并把输入类型I转变成输出类型O。

Process[I,O]的类型款式如下:

1 trait Process[I,O]{}
2 case class Halt[I,O]() extends Process[I,O]
3 case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]()) extends Process[I,O]
4 case class Await[I,O](rcvfn: Option[I] => Process[I,O]) extends Process[I,O]

每个Process[I,O]都可能处于三种状态之一:

1、Halt() 停止处理IO,退出。

2、Emit(head: O,tail: Process[I,O] = Halt[I,O]()) 输出类型O元素head,进入下一状态tail,默认输出head后完成退出。

3、Await(rcvfn: Option[I] => Process[I,O]) 等待一个类型I元素输入,处理IO,返回Process类型结果

可以看出,Await状态代表了某个Process的功能。Emit只是输出该Process对IO处理的结果。

注意:虽然Process[I,O]的功能是把Stream[I]转变成Stream[O],但它绝不是Stream[I] => Stream[O]类型的函数,而是在以上三种状态中游走的状态机器(State Machine)。

以下代码例子可以作为示范:

 1 trait Process[I,O] {
2 def apply(sin: Stream[I]): Stream[O] = this match {
3 case Halt() => Stream() //返回空的Stream
4 case Emit(out,next) => out #:: next(sin) //先输出out,跟着处理剔除out的Stream[I]输入
5 case Await(iproc) => sin match {
6 case h #:: t_stream => iproc(Some(h))(t_stream) //如果sin不为空,接受输入首元素后返回状态为处理剔除首元素的Stream[I]输入
7 case xs => iproc(None)(xs) //如果sin为空则返回处理空输入状态
8 }
9 }
10 }

按照讨论题目,以上例子中Stream[I]被转变成Stream[O],而实现方式则是按照具体状态来确定输出。

为了实现函数组合(functional composition),我们必须想办法把两个Process像接水管一样连接起来:一头的输出是另一头的输入(function fusion):

 1  def |>[O2](p2: Process[O,O2]): Process[I,O2] =  //p2的输入类型是this的输出O,最终输出为p2的输出O2
2 p2 match {
3 case Halt() => Halt() //下面的动作停了,整个管道都停了
4 case Emit(out,next) => Emit(out, this |> next) //如果正在输出就先输出然后再连接剩下的数据
5 case Await(iproc) => this match { //如果下游正在等待输入元素,那么就要看上游是什么情况了
6 case Halt() => Halt() //如果上游停顿那么整个管道都停
7 case Emit(out,next) => next |> iproc(Some(out)) //上游正在输出,下游收到后进入新状态
8 case Await(rcvfn) => Await((oi: Option[I]) => rcvfn(oi) |> p2) //假如上游收到输入元素,立即转入新状态再继续连接
9 }
10 }

以上程序并不难理解。现在我们可以这样编写IO处理语句:proc1 |> proc2 |> proc3。

另外,可以把两个Process的处理过程连接起来:一个Process处理完后接着处理另一个Process:

1   def ++(p2: Process[I,O]): Process[I,O] = //完成了this后接着再运算p2
2 this match {
3 case Halt() => p2 //上一个Process完成后接着运算p2
4 case Emit(out,next) => Emit(out, next ++ p2) //等上游完成所有输出后再运算p2
5 case Await(iproc) => Await(iproc andThen (_ ++ p2)) //等上游处理完输入后再运算p2
6 }

最基本的一些组件map,flatMap:

 1  def map[O2](f: O => O2): Process[I,O2] = //map Process的输出O
2 this match {
3 case Halt() => Halt() //没什么可以map的
4 case Emit(out,next) => Emit(f(out),next map f) //先map输入元素,再处理剩下的
5 case Await(iproc) => Await(iproc andThen (_ map f)) //处理完输入元素后再进行map
6 }
7 def flatMap[O2](f: O => Process[I,O2]): Process[I,O2] = //只处理输出端O
8 this match {
9 case Halt() => Halt()
10 case Emit(out,next) => f(out) ++ next.flatMap(f) //先处理头元素再flatMap剩下的
11 case Await(iproc) => Await(iproc andThen (_ flatMap f)) //处理完输入后再flatMap剩下的
12 }

我们再试试把一串元素喂入Process:

 1   def feed(ins: Seq[I]): Process[I,O] = {
2 @annotation.tailrec
3 def go(ins: Seq[I], curProcess: Process[I,O]): Process[I,O] = //尾递归算法
4 curProcess match {
5 case Halt() => Halt()
6 case Emit(out,next) => Emit(out, next.feed(ins)) //正在输出。就等完成输出后再喂剩下的
7 case Await(iproc) => {
8 if (ins.isEmpty) curProcess //完成了输入元素串,可以返回结果了
9 else go(ins.tail,iproc(Some(ins.head))) //吃下首元素然后再继续
10 }
11 }
12 go(ins,this)
13 }

有时我们希望能重复一些简单的过程:

 1   def repeat: Process[I,O] = { //永远重复下去
2 def go(p: Process[I,O]): Process[I,O] = //p代表当前更新状态
3 p match {
4 case Halt() => go(this) //不要停,重新再来
5 case Emit(out,next) => Emit(out, go(next)) //完成输出后继续go
6 case Await(iproc) => Await { //注意{}里是partialfunction。iproc是个函数,而partialfunction是function的子类,因而可以这样写
7 case None => iproc(None) //没有输入元素,继续等
8 case Some(i) => go(iproc(Some(i))) //处理输入元素后转入新状态然后继续
9 }
10 }
11 go(this)
12 }
13 def repeatN(n: Int): Process[I,O] = { //重复n次
14 def go(n: Int, curProcess: Process[I,O]): Process[I,O] =
15 curProcess match {
16 case Halt() => if (n <= 0) Halt() //n次后真停
17 else go(n-1, curProcess) //算一次重复
18 case Emit(out,next) => Emit(out, go(n,next)) //虽然状态更新了,但未完成流程。还不算一次重复
19 case Await(iproc) => Await {
20 case None => iproc(None) //继续等
21 case Some(i) => go(n,iproc(Some(i))) //更新了状态,但未完成流程,不算一次重复
22 }
23 }
24 go(n,this)
25 }

注意我们在以上代码中使用了PartialFunction来分解输入参数值。如果我们有个Function : intFunction(i: Int): String

我们可以定义它的PartialFunction:

{ case 0: "Zero"

   case 10: "Ten" }

由于Await(iproc)中的iproc >>> Option[I] => Process[I,O], PartialFunction是Function的子类所以我们可以写成:

Await {

 case None => ???

 case Some(i) => ???

}

下面是一组Process的基本方法和组件:

 1 object Process {
2 case class Halt[I,O]() extends Process[I,O]
3 case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]()) extends Process[I,O]
4 case class Await[I,O](rcvfn: Option[I] => Process[I,O]) extends Process[I,O]
5
6 def emit[I,O](out: O, next: Process[I,O] = Halt[I,O]()) = Emit(out, next)
7 def await[I,O](iproc: I => Process[I,O], fallback: Process[I,O] = Halt[I,O]): Process[I,O] =
8 Await {
9 case Some(i) => iproc(i) //使用基本类型I
10 case None => fallback //定义了没有输入元素时应该怎么处理
11 }
12 }

 我们可以把任何 I => O类型的函数升格成Process[I,O]:

 1   def liftOnce[I,O](f: I => O): Process[I,O] =  //给我一个I=>O,我返回Process[I,O]
2 Await {
3 case Some(i) => emit(f(i)) //等到一个输入元素I。把它升成一个状态为输出的Process
4 case None => Halt()
5 }
6 def repeatLift[I,O](f: I => O): Process[I,O] = liftOnce(f).repeat
7 def lift[I,O](f: I => O): Process[I,O] = //不同实现方式的repeatLift
8 Await {
9 case Some(i) => emit(f(i), lift(f))
10 case None => Halt()
11 }

还有些组件可以对输入元素进行过滤的:

 1  def filter[I](f: I => Boolean): Process[I,I] = //对输入I进行过滤,不转变I, 所以结果是: Process[I,I]
2 Await[I,I] { //用PartialFunction来分解两种输入参数值面对的情况
3 case None => Halt[I,I]() //没有输入,停止
4 case Some(i) if(f(i)) => Emit[I,I](i)
5 }.repeat //重复过滤所有输入元素
6 def take[I](n: Int): Process[I,I] = //可以中途退出
7 if (n <= 0) Halt[I,I]()
8 else Await[I,I] { //进行输入、输出这种IO操作
9 case None => Halt[I,I]() //没有输入就完成退出
10 case Some(i) => Emit[I,I](i,take[I](n-1)) //输出通过过滤的,继续过滤剩下的输入元素
11 }
12 def takeWhile[I](f: I => Boolean): Process[I,I] = //可以中途退出
13 Await[I,I] {
14 case None => Halt[I,I]() //没有输入就完成退出
15 case Some(i) if(f(i)) => Emit[I,I](i, takeWhile[I](f))
16 }
17 def sendAsIs[I]: Process[I,I] = lift(identity) //直接输出任何输入元素
18 def drop[I](n: Int): Process[I,I] = //必须浏览所有输入元素。不可中途退出
19 if (n <= 0) sendAsIs[I]
20 else Await[I,I](i => drop[I](n-1)) //收取输入元素,直接扔掉,继续n-1循环
21 def dropWhile[I](f: I => Boolean): Process[I,I] = //必须浏览所有输入元素。不可中途退出
22 await(i => if (f(i)) dropWhile[I](f) //注意用await, 不是Await
23 else emit(i, sendAsIs[I])) //
输出这个元素后继续循环输入元素

注意以上代码中的处理方式:如果过滤通过才emit,原封不动直接传递输入元素 I => I 用lift(identity)产生Process[I,I],用PartialFunction:

 Await {

  case None => ???

  case Some(i) => 

}

来分别处理可能出现的输入参数值。

我们先尝试些简单的算法:

 1    def count[I]: Process[I,Int] =  //读取输入元素次数
2 //读入任何东西都转成数字1.0 |> 读一个加一个 |> 读入一个就转成一个Int
3 lift((i: I) => 1.0 ) |> sum |> lift(_.toInt) //每一个输入元素都会走完整个管道
4 def count2[I]: Process[I,Int] = { //递归实现方式
5 def go(c: Int): Process[I,Int] =
6 await((i: I) => emit(c+1, go(c+1)))
7 go(0)
8 }
9 def mean: Process[Double,Double] = {
10 def go(s: Double, c: Double): Process[Double,Double] =
11 await((d: Double) => emit((s+d)/(c+1), go(s+d,c+1)))
12 go(0.0,0.0)
13 }
14 //以上的内部函数go都体现了一些共同点:有一个起始值,然后维护状态。我们可以分解出一个新的函数
15 def loop[S,I,O](z: S)(f: (I,S) => (O,S)): Process[I,O] =
16 await((i: I) => f(i,z) match {
17 case (o,s2) => emit(o, loop(s2)(f))
18 })
19 //用loop来实现上面的函数
20 def sum2: Process[Double,Double] =
21 loop(0.0)((i:Double,s) => (s+i,s+1))
22 def count3[I]: Process[I,Int] =
23 loop(0)((_: I, c) => (c+1, c+1))

再写一些逻辑小组件:

1    def any: Process[Boolean, Boolean] =  //检查是否收到过true值。即使收到true还是会继续收取输入直至完成读取
2 loop(false)((b: Boolean, s) => ( b || s, b || s))
3 def exists[I](f: I => Boolean): Process[I,Boolean] = //不能中途退出
4 lift(f) |> any //重复检查输入然后确定是否true. 一旦遇到true永远返回true
5 def echo[I]: Process[I,I] = await(i => emit(i))
6 def skip[I,O]: Process[I,O] = await(i => Halt())
7 def ignore[I,O]: Process[I,O] = skip.repeat

也可以过滤输出元素:

1  def filter(f: O => Boolean): Process[I,O] = //过滤输出元素
2 this |> Process.filter(f) //this的输出接到下一个Process的输入端然后过滤它的输入元素

zip两个Process:

 1    def feedOne[I,O](oi: Option[I])(p: Process[I,O]): Process[I,O] = //把一个元素输入p
2 p match {
3 case Halt() => p //无法输入,它还是它
4 case Emit(out,next) => Emit(out, feedOne(oi)(next)) //正在输出。输出完当前元素再开始喂入
5 case Await(iproc) => iproc(oi) //直接喂入
6 }
7
8 def zip[I,O,O2](p1: Process[I,O], p2: Process[I,O2]): Process[I,(O,O2)] = //同一串输入元素同时喂入p1,p2。合并输出2tuple
9 (p1,p2) match {
10 case (Halt(), _) => Halt()
11 case (_, Halt()) => Halt()
12 case (Emit(h1,t1), Emit(h2,t2)) => Emit((h1,h2), zip(t1,t2))
13 case (Await(iproc), _) => Await((oi: Option[I]) => zip(iproc(oi), feedOne(oi)(p2)))
14 case (_, Await(iproc)) => Await((oi: Option[I]) => zip(feedOne(oi)(p1), iproc(oi)))
15 }
16 val mean2 = zip[Double,Double,Int](sum,count) |> lift {case (s,c) => s/c}

还有那个熟悉的zipWithIndex:

1    def zip[O2](p2: Process[I,O2]): Process[I,(O,O2)] =
2 Process.zip(this,p2)
3 def zipWithIndex: Process[I,(O,Int)] =
4 this zip (count map {_ + 1}) //zip从0开始

现在我们肯定可以使用这样的表达式:

count |> exists {_ > 40000}。

当然我们还没有开始讨论这个管道两头的数据源。因为我们要分开独立讨论它。

下面是以上示范代码汇总:

 1 trait Process[I,O] {
2 import Process._
3 def apply(sin: Stream[I]): Stream[O] = this match {
4 case Halt() => Stream() //返回空的Stream
5 case Emit(out,next) => out #:: next(sin) //先输出out,跟着处理剔除out的Stream[I]输入
6 case Await(iproc) => sin match {
7 case h #:: t_stream => iproc(Some(h))(t_stream) //如果sin不为空,接受输入首元素后返回状态为处理剔除首元素的Stream[I]输入
8 case xs => iproc(None)(xs) //如果sin为空则返回处理空输入状态
9 }
10 }
11 def |>[O2](p2: Process[O,O2]): Process[I,O2] = //p2的输入类型是this的输出O,最终输出为p2的输出O2
12 p2 match {
13 case Halt() => Halt() //下面的动作停了,整个管道都停了
14 case Emit(out,next) => Emit(out, this |> next) //如果正在输出就先输出然后再连接剩下的数据
15 case Await(iproc) => this match { //如果下游正在等待输入元素,那么就要看上游是什么情况了
16 case Halt() => Halt() //如果上游停顿那么整个管道都停
17 case Emit(out,next) => next |> iproc(Some(out)) //上游正在输出,下游收到后进入新状态
18 case Await(rcvfn) => Await((oi: Option[I]) => rcvfn(oi) |> p2) //假如上游收到输入元素,立即转入新状态再继续连接
19 }
20 }
21 def ++(p2: Process[I,O]): Process[I,O] = //完成了this后接着再运算p2
22 this match {
23 case Halt() => p2 //上一个Process完成后接着运算p2
24 case Emit(out,next) => Emit(out, next ++ p2) //等上游完成所有输出后再运算p2
25 case Await(iproc) => Await(iproc andThen (_ ++ p2)) //等上游处理完输入后再运算p2
26 }
27 def map[O2](f: O => O2): Process[I,O2] = //map Process的输出O
28 this match {
29 case Halt() => Halt() //没什么可以map的
30 case Emit(out,next) => Emit(f(out),next map f) //先map输入元素,再处理剩下的
31 case Await(iproc) => Await(iproc andThen (_ map f)) //处理完输入元素后再进行map
32 }
33 def flatMap[O2](f: O => Process[I,O2]): Process[I,O2] = //只处理输出端O
34 this match {
35 case Halt() => Halt()
36 case Emit(out,next) => f(out) ++ next.flatMap(f) //先处理头元素再flatMap剩下的
37 case Await(iproc) => Await(iproc andThen (_ flatMap f)) //处理完输入后再flatMap剩下的
38 }
39 def feed(ins: Seq[I]): Process[I,O] = {
40 @annotation.tailrec
41 def go(ins: Seq[I], curProcess: Process[I,O]): Process[I,O] = //尾递归算法
42 curProcess match {
43 case Halt() => Halt()
44 case Emit(out,next) => Emit(out, next.feed(ins)) //正在输出。就等完成输出后再喂剩下的
45 case Await(iproc) => {
46 if (ins.isEmpty) curProcess //完成了输入元素串,可以返回结果了
47 else go(ins.tail,iproc(Some(ins.head))) //吃下首元素然后再继续
48 }
49 }
50 go(ins,this)
51 }
52 def repeat: Process[I,O] = { //永远重复下去
53 def go(p: Process[I,O]): Process[I,O] = //p代表当前更新状态
54 p match {
55 case Halt() => go(this) //不要停,重新再来
56 case Emit(out,next) => Emit(out, go(next)) //完成输出后继续go
57 case Await(iproc) => Await { //注意{}里是partialfunction。iproc是个函数,而partialfunction是function的子类,因而可以这样写
58 case None => iproc(None) //没有输入元素,继续等
59 case Some(i) => go(iproc(Some(i))) //处理输入元素后转入新状态然后继续
60 }
61 }
62 go(this)
63 }
64 def repeatN(n: Int): Process[I,O] = { //重复n次
65 def go(n: Int, curProcess: Process[I,O]): Process[I,O] =
66 curProcess match {
67 case Halt() => if (n <= 0) Halt() //n次后真停
68 else go(n-1, curProcess) //算一次重复
69 case Emit(out,next) => Emit(out, go(n,next)) //虽然状态更新了,但未完成流程。还不算一次重复
70 case Await(iproc) => Await {
71 case None => iproc(None) //继续等
72 case Some(i) => go(n,iproc(Some(i))) //更新了状态,但未完成流程,不算一次重复
73 }
74 }
75 go(n,this)
76 }
77 def filter(f: O => Boolean): Process[I,O] = //过滤输出元素
78 this |> Process.filter(f) //this的输出接到下一个Process的输入端然后过滤它的输入元素
79 def orElse(p: Process[I,O]): Process[I,O] =
80 this match {
81 case Halt() => p
82 case Await(iproc) => Await {
83 case None => p
84 case x => iproc(x)
85 }
86 case _ => this
87 }
88 def zip[O2](p2: Process[I,O2]): Process[I,(O,O2)] =
89 Process.zip(this,p2)
90 def zipWithIndex: Process[I,(O,Int)] =
91 this zip (count map {_ + 1}) //zip从0开始
92 }
93 object Process {
94 case class Halt[I,O]() extends Process[I,O]
95 case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]()) extends Process[I,O]
96 case class Await[I,O](rcvfn: Option[I] => Process[I,O]) extends Process[I,O]
97
98 def emit[I,O](out: O, next: Process[I,O] = Halt[I,O]()) = Emit(out, next)
99 def await[I,O](iproc: I => Process[I,O], fallback: Process[I,O] = Halt[I,O]): Process[I,O] =
100 Await {
101 case Some(i) => iproc(i) //使用基本类型I
102 case None => fallback //定义了没有输入元素时应该怎么处理
103 }
104 def liftOnce[I,O](f: I => O): Process[I,O] = //给我一个I=>O,我返回Process[I,O]
105 Await {
106 case Some(i) => emit(f(i)) //等到一个输入元素I。把它升成一个状态为输出的Process
107 case None => Halt()
108 }
109 def repeatLift[I,O](f: I => O): Process[I,O] = liftOnce(f).repeat
110 def lift[I,O](f: I => O): Process[I,O] = //不同实现方式的repeatLift
111 Await {
112 case Some(i) => emit(f(i), lift(f))
113 case None => Halt()
114 }
115 def filter[I](f: I => Boolean): Process[I,I] = //对输入I进行过滤,不转变I, 所以结果是: Process[I,I]
116 Await[I,I] { //用PartialFunction来分解两种输入参数值面对的情况
117 case None => Halt[I,I]() //没有输入,停止
118 case Some(i) if(f(i)) => Emit[I,I](i)
119 }.repeat //重复过滤所有输入元素
120 def take[I](n: Int): Process[I,I] = //可以中途退出
121 if (n <= 0) Halt[I,I]()
122 else Await[I,I] { //进行输入、输出这种IO操作
123 case None => Halt[I,I]() //没有输入就完成退出
124 case Some(i) => Emit[I,I](i,take[I](n-1)) //输出通过过滤的,继续过滤剩下的输入元素
125 }
126 def takeWhile[I](f: I => Boolean): Process[I,I] = //可以中途退出
127 Await[I,I] {
128 case None => Halt[I,I]() //没有输入就完成退出
129 case Some(i) if(f(i)) => Emit[I,I](i, takeWhile[I](f))
130 }
131 def sendAsIs[I]: Process[I,I] = lift(identity) //直接输出任何输入元素
132 def drop[I](n: Int): Process[I,I] = //必须浏览所有输入元素。不可中途退出
133 if (n <= 0) sendAsIs[I]
134 else Await[I,I](i => drop[I](n-1)) //收取输入元素,直接扔掉,继续n-1循环
135 def dropWhile[I](f: I => Boolean): Process[I,I] = //必须浏览所有输入元素。不可中途退出
136 await(i => if (f(i)) dropWhile[I](f) //注意用await, 不是Await
137 else emit(i, sendAsIs[I])) //输出这个元素后继续循环输入元素
138
139 def sum: Process[Double,Double] = { //读进数字,输出当前总数
140 def go(acc: Double): Process[Double,Double] =
141 await(d => emit(acc+d, go(acc+d)))
142 go(0.0)
143 }
144 def count[I]: Process[I,Int] = //读取输入元素次数
145 //读入任何东西都转成数字1.0 |> 读一个加一个 |> 读入一个就转成一个Int
146 lift((i: I) => 1.0 ) |> sum |> lift(_.toInt) //每一个输入元素都会走完整个管道
147 def count2[I]: Process[I,Int] = { //递归实现方式
148 def go(c: Int): Process[I,Int] =
149 await((i: I) => emit(c+1, go(c+1)))
150 go(0)
151 }
152 def mean: Process[Double,Double] = {
153 def go(s: Double, c: Double): Process[Double,Double] =
154 await((d: Double) => emit((s+d)/(c+1), go(s+d,c+1)))
155 go(0.0,0.0)
156 }
157 //以上的内部函数go都体现了一些共同点:有一个起始值,然后维护状态。我们可以分解出一个新的函数
158 def loop[S,I,O](z: S)(f: (I,S) => (O,S)): Process[I,O] =
159 await((i: I) => f(i,z) match {
160 case (o,s2) => emit(o, loop(s2)(f))
161 })
162 //用loop来实现上面的函数
163 def sum2: Process[Double,Double] =
164 loop(0.0)((i:Double,s) => (s+i,s+1))
165 def count3[I]: Process[I,Int] =
166 loop(0)((_: I, c) => (c+1, c+1))
167 def any: Process[Boolean, Boolean] = //检查是否收到过true值。即使收到true还是会继续收取输入直至完成读取
168 loop(false)((b: Boolean, s) => ( b || s, b || s))
169 def exists[I](f: I => Boolean): Process[I,Boolean] = //不能中途退出
170 lift(f) |> any //重复检查输入然后确定是否true. 一旦遇到true永远返回true
171 def echo[I]: Process[I,I] = await(i => emit(i))
172 def skip[I,O]: Process[I,O] = await(i => Halt())
173 def ignore[I,O]: Process[I,O] = skip.repeat
174
175 def feedOne[I,O](oi: Option[I])(p: Process[I,O]): Process[I,O] = //把一个元素输入p
176 p match {
177 case Halt() => p //无法输入,它还是它
178 case Emit(out,next) => Emit(out, feedOne(oi)(next)) //正在输出。输出完当前元素再开始喂入
179 case Await(iproc) => iproc(oi) //直接喂入
180 }
181
182 def zip[I,O,O2](p1: Process[I,O], p2: Process[I,O2]): Process[I,(O,O2)] = //同一串输入元素同时喂入p1,p2。合并输出2tuple
183 (p1,p2) match {
184 case (Halt(), _) => Halt()
185 case (_, Halt()) => Halt()
186 case (Emit(h1,t1), Emit(h2,t2)) => Emit((h1,h2), zip(t1,t2))
187 case (Await(iproc), _) => Await((oi: Option[I]) => zip(iproc(oi), feedOne(oi)(p2)))
188 case (_, Await(iproc)) => Await((oi: Option[I]) => zip(feedOne(oi)(p1), iproc(oi)))
189 }
190 val mean2 = zip[Double,Double,Int](sum,count) |> lift {case (s,c) => s/c}
191
192 count |> exists {_ > 40000}
193 }