泛函编程(32)-泛函IO:IO Monad

时间:2022-10-31 04:50:08

    由于泛函编程非常重视函数组合(function composition),任何带有副作用(side effect)的函数都无法实现函数组合,所以必须把包含外界影响(effectful)副作用不纯代码(impure code)函数中的纯代码部分(pure code)抽离出来形成独立的另一个纯函数。我们通过代码抽离把不纯代码逐步抽离向外推并在程序里形成一个纯代码核心(pure core)。这样我们就可以顺利地在这个纯代码核心中实现函数组合。IO Monad就是泛函编程处理副作用代码的一种手段。我们先用个例子来示范副作用抽离:

1 case class Player(name: String, score: Int)
2 def printWinner(p: Player): Unit =
3 println(p.name + " is the winner!")
4 def declareWinner(p1: Player, p2: Player): Unit =
5 if (p1.score > p2.score ) printWinner(p1)
6 else printWinner(p2)

很明显,declareWinner是个包含副作用的函数。它做了两件事:先对比两个Player的分数然后打印分数较大的Player。这里打印可以说是一项带有外作用的函数,我们试着把它分离出来:

1 def printWinner(p: Player): Unit = 
2 println(p.name + " is the winner!")
3 def winner(p1: Player, p2: Player): Player =
4 if (p1.score > p2.score) p1
5 else p2
6 def declareWinner(p1: Player, p2: Player): Unit =
7 printWinner(winner(p1, p2))

我们把分数比较代码winner分离了出来。我们还可以继续分解:printWinner也可以被认为做了两件事:先合成了一条信息,然后打印信息:

1 def winnerMsg(p: Player): String =
2 p.name + " is the winner!"
3 def printWinner(p: Player): Unit =
4 println(winnerMsg(p))
5 def winner(p1: Player, p2: Player): Player =
6 if (p1.score > p2.score) p1
7 else p2
8 def declareWinner(p1: Player, p2: Player): Unit =
9 printWinner(winner(p1, p2))

这个例子看起来好像有些幼稚,但它示范了泛函编程的函数分解原理:我们并没有改变程序的功能,只是对程序代码进行了分解。把程序分解成更细的函数。实际上任何一个包含副作用的函数内都会有纯函数等待我们去分解。用一种代数关系表达就是:任何一个函数 A => B 都可以被分解成:

1、一个纯函数:A => D, 这里D只是一个功能描述表达式

2、一个带副作用的非纯函数: D => B, 它可以被视为D的解译器(interpreter),把描述解译成有副作用的指令

在泛函编程中我们会持续进行这种函数分解(factoring),把含有副作用的代码分解提取出来向外推形成一个副作用代码层。这个非纯函数形成的代码层内部则是经分解形成的纯代码核心。最后我们到达了那些表面上已经无可分解的非纯函数如:println,它的类型是String => Unit, 接下去我们应该怎么办呢?

实际上通过增加一个新的数据类型IO我们甚至可以对println进行分解:

 1 trait IO {def run: Unit }
2 def printLine(line: String) : IO = new IO {
3 def run = println(line)
4 }
5 def printWinner(p: Player): IO =
6 printLine(winnerMsg(p))
7 case class Player(name: String, score: Int)
8 def winnerMsg(p: Player): String =
9 p.name + " is the winner!"
10 def winner(p1: Player, p2: Player): Player =
11 if (p1.score > p2.score) p1
12 else p2
13 def declareWinner(p1: Player, p2: Player): Unit =
14 printWinner(winner(p1, p2))

现在函数printWinner已经变成了纯函数,它返回了一个IO值:这个IO值只是对一个副作用的描述但并没有运行它。只有这个IO类型的解译器(interpreter)在运算这个IO值时才会真正产生相应的副作用。

这里涉及到一些大的概念:编写IO程序和运算IO值是相互分离的过程(separation of concern)。我们的IO程序用一系列表达式描述了要求的IO功能,而IO interpreter实现这些功能的方式可以是多样的,包括:外设读写,文件、数据库读写及并行读取等等。如何实现这些IO功能与IO程序编写无任何关系。

现在,有了这个IO类型,我们可以放心地用函数组合的泛函编程方式围绕着这个IO类型来编写IO程序,因为我们知道通过这个IO类型我们把副作用的产生推延到IO程序之外的IO解译器里,而IO编程与解译器是两个各自独立的程序。

泛函模式的IO编程就是把IO功能表达和IO副作用产生分开设计:IO功能描述使用基于IO Monad的Monadic编程语言,充分利用函数组合进行。而产生副作用的IO实现则推延到独立的Interpreter部分。当然,Interpreter也有可能继续分解,把产生副作用代码再抽离向外推延,这样我们还可以对Interpreter的纯代码核心进行函数组合。

我们上面的简版IO类型只代表输出类型(output type)。Input类型需要一个存放输入值的变量。在泛函编程模式里变量是用类型参数代表的:

1 trait IO[+A] { self =>
2 def run: A
3 def map[B](f: A => B): IO[B] =
4 new IO[B] { def run = f(self.run)}
5 def flatMap[B](f: A => IO[B]): IO[B] =
6 new IO[B] {def run = f(self.run).run}
7 }

我们用run来对IO值进行计算。在上面我们已经实现了map和flatMap函数,所以这个IO类型就是个Monad。看下面:

1 object IO extends Monad[IO] {
2 def unit[A](a: A) = new IO[A] {def run = a}
3 def flatMap[A,B](ma: IO[A])(f: A => IO[B]) = ma flatMap f
4 def map[A,B](ma: IO[A])(f: A => B) = ma map f
5 def apply[A](a: A) = unit(a) //IO构建器,可以实现 IO {...}
6 }

既然IO类型是个Monad类型,那么我们就可以使用monadic语言编程了:

1 def ReadLine: IO[String] = IO { readLine }
2 def PrintLine(msg: String): IO[Unit] = IO { println(msg) }
3 def fahrenheitToCelsius(f: Double): Double =
4 (f -32) * 5.0 / 9.0
5 def converter: IO[Unit] = for {
6 _ <- PrintLine("Enter a temperature in degrees fahrenheit:")
7 d <- ReadLine.map(_.toDouble)
8 _ <- PrintLine(fahrenheitToCelsius(d).toString)
9 } yield ()

我们再来看看这个IO类型:IO[A] { def run: A },从类型款式来看我们只知道IO[A]类型值是个延后值,因为A值是通过调用函数run取得的。实际情况是run在运算A值时run函数里的纯代码向程序外的环境提请一些运算要求如输入(readLine),然后把结果传递到另外一些纯代码;然后这些纯代码有可能又向外提请。我们根本无法确定副作用是在那个环节产生的。如此可以确定,这个IO类型无法完整地表达IO运算。我们必需对IO类型进行重新定义:

1 trait IO[A] {def run: A}
2 case class Pure[+A](a: A) extends IO[A]
3 case class Request[Extenal[_],I,A](expr: Extenal[I], cont: I => IO[A]) extends IO[A]

这个IO类型把纯代码与副作用代码分开两种IO运算状态:IO运算可以是一个纯函数值,或者是一个外部副作用运算请求。这个External类型定义了外部副作用运算方式,它决定了我们程序能获得什么样的外部副作用运算。这个External[I]就像一个表达式,但只能用外部运算IO的程序来运算它。cont函数是个接续函数,它决定了获取External[I]运算结果后接着该做些什么。

现在我们可以明确分辨一个运算中的纯函数和副作用函数。但是我们还无法控制External类型的行为。External[I]可以代表一个简单的推延值,如下:

1 trait Runnable[A] { def run: A }
2 object Delay {
3 def apply[A](a: A) = new Runnable[A] { def run = a}
4 }
5 Delay {println("SIDE EFFECTS!!!")}

如上所示,任何副作用都可以被放入Delay。如果我们希望更好控制使用外界影响,可以把External的选项作为IO的类参数:

1 trait IO[F[_],+A] {}
2 case class Pure[F[_],+A](get: A) extends IO[F,A]
3 case class Request[F[_],I,+A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]

我们只是把External换成了F,然后把它放进IO类型参数。现在我们可以通过定义不同的F类型来获取不同的副作用,例如:

1 trait Console[A]
2 case object ReadLine extends Console[Option[String]]
3 case class PrintLine(msg: String) extends Console[Unit]

现在通过IO[Console,A]我们获得了只能对键盘显示屏进行读写的副作用。当然,我们还可以定义文件、数据库、网络读写这些IO能力的F类型。所以我们通过定义F来规范使用副作用。注意,即使在Console类型我们也无法获知副作用是否的确产生,这部分是由F类型的Interpreter在运算IO程序时才确定的。这不又是Free Monad分开独立考虑(separation of concern)的Interpreter部分嘛。这是我们可以把这部分延后分开考虑。

我们先看看如何计算这个IO类型的值:

 1 trait Run[F[_]] {
2 def apply[A](expr: F[A]): (A, Run[F])
3 }
4 object IO {
5 def run[F[_],A](R: Run[F])(io: IO[F,A]): A = io match {
6 case Pure(a) => a
7 case Request(expr, cont) => R(expr) match {
8 case (a,r2) => run(r2)(cont(a))
9 }
10 }
11 }

可以看出这个run函数是个递归算法:先计算F值然后再递归调用run运算所产生的IO值。

我们现在可以创建一个F类型的实例然后运算IO:

 1 trait Console[A]
2 case object ReadLine extends Console[Option[String]]
3 case class PrintLine(msg: String) extends Console[Unit]
4
5 object RunConsole extends Run[Console] {
6 def apply[A](c: Console[A]): (A, Run[Console]) = c match {
7 case ReadLine => {
8 val r = try Some(readLine) catch { case _ => None }
9 (r, RunConsole)
10 }
11 case PrintLine(m) => (println(m),RunConsole)
12 }
13 }
14 IO.run(RunConsole)(ioprg)

实际上这个IO类型是个Monad,因为我们可以实现它的unit和flatMap函数:

 1 trait IO[F[_],A] {
2 def unit(a: A) = Pure(a)
3 def flatMap[B](f: A => IO[F,B]): IO[F,B] = this match {
4 case Pure(a) => f(a)
5 // case Request(expr,cont) => Request(expr, cont andThen (_ flatMap f))
6 case Request(expr,cont) => Request(expr, (x: Any) => cont(x) flatMap f)
7 }
8 def map[B](f: A => B): IO[F,B] = flatMap(a => Pure(f(a)))
9 }
10 case class Pure[F[_],A](get: A) extends IO[F,A]
11 case class Request[F[_],I,A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]

 它的Monad实例如下:

1 def ioMonad[F[_]] = new Monad[({type l[x] = IO[F, x]})#l] {
2 def unit[A](a: A) = Pure(a)
3 def flatMap[A,B](fa:IO[F,A])(f: A => IO[F,B]): IO[F,B] = fa flatMap f
4 def map[A,B](fa: IO[F,A])(f: A => B): IO[F,B] = fa map f
5
6 }

我们可以把这个IO类型的运算方式再概括一点:只要F类型是个Monad,那么我们就可以运算IO值:

 1 trait IO[F[_],A] {
2 def unit(a: A) = Pure(a)
3 def flatMap[B](f: A => IO[F,B]): IO[F,B] = this match {
4 case Pure(a) => f(a)
5 // case Request(expr,cont) => Request(expr, cont andThen (_ flatMap f))
6 case Request(expr,cont) => Request(expr, (x: Any) => cont(x) flatMap f)
7 }
8 def map[B](f: A => B): IO[F,B] = flatMap(a => Pure(f(a)))
9 def runM[F[_],A](F: Monad[F])(io: IO[F,A]): F[A] = io match {
10 case Pure(a) => F.unit(a)
11 // case Request(expr, cont) => F.flatMap(expr)(cont andThen (_.runM(F)(io)))
12 case Request(expr, cont) => F.flatMap(expr)(x => cont(x).runM(F)(io))
13 }
14 }
15 case class Pure[F[_],A](get: A) extends IO[F,A]
16 case class Request[F[_],I,A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]

有了F类型的Monad实例,函数runM现在能运算IO类型的值了。

在以上的讨论过程中我们得出了这样的结论:F类型代表了IO类型的Interpreter,我们不需要知道它到底产生副作用与否或者怎样产生。我们用F类型来把副作用的使用推延到F类型的实例。我们的IO程序只是对IO算法的描述。如何运算IO值包括如何使用副作用则在运算Interpreter时才体现。

作为IO算法,首先必须注意防止的就是递归算法产生的堆栈溢出问题。运算IO值的runM是个递归算法,那我们必须保证至少它是一个尾递归算法。当然,我们前面讨论的Trampoline类型是最佳选择。我们可以比较一下IO和Trampoline类型结构:

 1 trait IO[F[_],A] {
2 def unit(a: A) = Pure(a)
3 def flatMap[B](f: A => IO[F,B]): IO[F,B] = this match {
4 case Pure(a) => f(a)
5 // case Request(expr,cont) => Request(expr, cont andThen (_ flatMap f))
6 case Request(expr,cont) => Request(expr, (x: Any) => cont(x) flatMap f)
7 }
8 def map[B](f: A => B): IO[F,B] = flatMap(a => Pure(f(a)))
9 }
10 case class Pure[F[_],A](get: A) extends IO[F,A]
11 case class Request[F[_],I,A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]
12
13 trait Trampoline[A] {
14 def unit(a: A): Trampoline[A] = Done(a)
15 def flatMap[B](f: A => Trampoline[B]): Trampoline[B] = this match {
16 case Done(a) => f(a)
17 case More(k) => k() flatMap f
18 }
19 def map[B](f: A => B): Trampoline[B] = flatMap(a => Done(f(a)))
20 }
21 case class Done[A](a: A) extends Trampoline[A]
22 case class More[A](k: () => Trampoline[A]) extends Trampoline[A]

 

它们有许多相似点。最主要的是它们都是循环递归结构,能实现以heap换stack目的。我们可以把Trampoline类型的算法引进到IO类型中,这样就可以有效防止*问题。实际上IO类型与Trampoline类型的深度抽象类型Free Monad更为相似:

 

 1 trait Free[F[_],A] {
2 private case class FlatMap[B](a: Free[F,A], f: A => Free[F,B]) extends Free[F,B]
3 def unit(a: A): Free[F,A] = Return(a)
4 def flatMap[B](f: A => Free[F,B])(implicit F: Functor[F]): Free[F,B] = this match {
5 case Return(a) => f(a)
6 case Suspend(k) => Suspend(F.map(k)(a => a flatMap f))
7 case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f))
8 }
9 def map[B](f: A => B)(implicit F: Functor[F]): Free[F,B] = flatMap(a => Return(f(a)))
10 }
11 case class Return[F[_],A](a: A) extends Free[F,A]
12 case class Suspend[F[_],A](ffa: F[Free[F,A]]) extends Free[F,A]

Free类型的FlatMap结构和IO类型的Request结构极其相像。我们在前面的讨论中已经介绍了Free类型:首先它是一个为支持尾递归算法而设计的结构,是由一个Functor F产生的Monad。Free的功能由Monad和Interpreter两部分组成:Monad部分使我们可以使用Monadic编程语言来描述一些算法,Interpreter就是F类型,必须是个Functor,它负责描述副作用行为。只有在运算算法时才真正产生副作用。我们可以直接使用Free类型代表IO运算:用Free的Monadic编程语言来描述IO算法,用Interpreter来描述IO效果,用Free的Trampoline运算机制实现尾递归运算。现在我们先看看完整的Free类型:

 1 trait Free[F[_],A] {
2 private case class FlatMap[B](a: Free[F,A], f: A => Free[F,B]) extends Free[F,B]
3 def unit(a: A): Free[F,A] = Return(a)
4 def flatMap[B](f: A => Free[F,B])(implicit F: Functor[F]): Free[F,B] = this match {
5 case Return(a) => f(a)
6 case Suspend(k) => Suspend(F.map(k)(a => a flatMap f))
7 case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f))
8 }
9
10 def map[B](f: A => B)(implicit F: Functor[F]): Free[F,B] = flatMap(a => Return(f(a)))
11 def resume(implicit F: Functor[F]): Either[F[Free[F,A]],A] = this match {
12 case Return(a) => Right(a)
13 case Suspend(k) => Left(k)
14 case FlatMap(a,f) => a match {
15 case Return(b) => f(b).resume
16 case Suspend(k) => Left(F.map(k)(_ flatMap f))
17 case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f)).resume
18 }
19 }
20 def liftF(fa: F[A])(implicit F: Functor[F]): Free[F,A] =
21 Suspend(F.map(fa)(Return(_)))
22 }
23 case class Return[F[_],A](a: A) extends Free[F,A]
24 case class Suspend[F[_],A](ffa: F[Free[F,A]]) extends Free[F,A]
 

我们先用Free Monadic编程语言来描述IO算法:

 1 trait Console[A]
2 case class GetLine[A](next: A) extends Console[A]
3 case class PutLine[A](msg: String, next: A) extends Console[A]
4 implicit val consoleFunctor = new Functor[Console]{
5 def map[A,B](ca: Console[A])(f: A => B): Console[B] = ca match {
6 case GetLine(a) => GetLine(f(a))
7 case PutLine(m,a) => PutLine(m,f(a))
8 }
9 } //> consoleFunctor : ch13.ex3.Functor[ch13.ex3.Console] = ch13.ex3$$anonfun$ma
10 //| in$1$$anon$1@53e25b76
11 type ConsoleIO[A] = Free[Console,A]
12 implicit def liftConsole[A](ca: Console[A]) = Free.liftF(ca)
13 //> liftConsole: [A](ca: ch13.ex3.Console[A])ch13.ex3.Free[ch13.ex3.Console,A]
14 def putLine(msg: String) = PutLine(msg,()) //> putLine: (msg: String)ch13.ex3.PutLine[Unit]
15 def getLine = GetLine(()) //> getLine: => ch13.ex3.GetLine[Unit]
16 val ioprg:ConsoleIO[Unit] = for {
17 _ <- putLine("What is your first name ?")
18 first <- getLine
19 _ <- putLine("What is your last name ?")
20 last <- getLine
21 _ <- putLine(s"Hello, $first $last !")
22 } yield() //> ioprg : ch13.ex3.Free[ch13.ex3.Console,Unit] = Suspend(PutLine(What is you
23 //| r first name ?,Suspend(GetLine(Suspend(PutLine(What is your last name ?,Sus
24 //| pend(GetLine(Suspend(PutLine(Hello, () () !,Return(())))))))))))

现在我们用Monadic编程语言描述了一个IO程序,下一步就是运算这个IO程序从而获得它的值。如何运算IO值是Interpreter的功能。这个过程可能会产生副作用。至于如何产生副作用,产生什么样的副作用则由Interpreter程序描述。IO值运算过程就是一个由Monadic IO功能描述到IO影响产生方式Interpret语句的语言转换(interpret,翻译)。我们可以来看看这个运算函数:

 1 trait ~>[F[_],G[_]]{
2 def apply[A](fa: F[A]): G[A]
3 }
4 trait Free[F[_],A] {
5 private case class FlatMap[B](a: Free[F,A], f: A => Free[F,B]) extends Free[F,B]
6 def unit(a: A): Free[F,A] = Return(a)
7 def flatMap[B](f: A => Free[F,B])(implicit F: Functor[F]): Free[F,B] = this match {
8 case Return(a) => f(a)
9 case Suspend(k) => Suspend(F.map(k)(a => a flatMap f))
10 case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f))
11 }
12
13 def map[B](f: A => B)(implicit F: Functor[F]): Free[F,B] = flatMap(a => Return(f(a)))
14 def resume(implicit F: Functor[F]): Either[F[Free[F,A]],A] = this match {
15 case Return(a) => Right(a)
16 case Suspend(k) => Left(k)
17 case FlatMap(a,f) => a match {
18 case Return(b) => f(b).resume
19 case Suspend(k) => Left(F.map(k)(_ flatMap f))
20 case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f)).resume
21 }
22 }
23 def foldMap[G[_]](f: (F ~> G))(implicit F: Functor[F], G: Monad[G]): G[A] = resume match {
24 case Right(a) => G.unit(a)
25 case Left(k) => G.flatMap(f(k))(_ foldMap f)
26 }
27 }
28 case class Return[F[_],A](a: A) extends Free[F,A]
29 case class Suspend[F[_],A](ffa: F[Free[F,A]]) extends Free[F,A]
30 object Free {
31 def liftF[F[_],A](fa: F[A])(implicit F: Functor[F]): Free[F,A] =
32 Suspend(F.map(fa)(Return(_)))
33
34 }

这个foldMap就是一个IO程序运算函数。由于它是一个循环计算,所以通过resume函数引入Trampoline尾递归计算方式来保证避免*问题发生。foldMap函数将IO描述语言F解译成可能产生副作用的G语言。在解译的过程中逐步用flatMap运行非纯代码。

我们可以用Free Monad的结构替代IO类型结构,这样我们就可以用Monadic编程语言来描述IO程序。至于实际的IO副作用如何,我们只知道产生副作用的Interpret程序是个Monad,其它一无所知。

现在我们可以进入Interpreter编程了:

 1 type Id[A] = A
2 implicit val idMonad = new Monad[Id] {
3 def unit[A](a: A): A = a
4 def flatMap[A,B](fa: A)(f: A => B): B = f(fa)
5 }
6 object ConsoleEffect extends (Console ~> Id) {
7 def apply[A](c: Console[A]): A = c match {
8 case GetLine(n) => readLine ; n
9 case PutLine(m,n) => println(m); n
10 }
11 }
12 ioprg.foldMap(ConsoleEffect)

我们说过:运算IO值就是把IO程序语言逐句翻译成产生副作用的Interpreter语言这个过程。在以上例子里我们采用了Id Monad作为Interpreter语言。Id Monad的flatMap不做任何事情,所以IO程序被直接对应到基本IO函数readLine, println上了。

我们也可以把副作用变成对List进行读写:

 1 case class InOutLog(inLog: List[String], outLog: List[String])
2 case class Logger[A](runLogger: InOutLog => (A, InOutLog))
3 object MockConsole extends (Console ~> Logger) {
4 def apply[A](c: Console[A]): Logger[A] = Logger[A](
5 s => (c, s) match {
6 case (GetLine(n), InOutLog(in,out)) => (in.head, InOutLog(in.tail,out))
7 case (PutLine(l,n), InOutLog(in,out)) => ((), InOutLog(in, l :: out))
8 }
9 )
10 }
11
12 val s = ioprg.foldMap(MockConsole)
13 s.runLogger(InOutLog(List("Tiger","Chan"),Nil))

只要我们在Interpret程序里把GetLine,PutLine对应到InLog,OutLog两个List的读写。

如果我们需要采用无独占(Non-blocking)读写副作用的话可以这样改写Interpreter:

 1 object NonBlockingIO extends(Console ~> Future) {
2 def apply[A](c: Console[A]): Future[A] = c match {
3 case GetLine(n) => Future.unit {
4 try Some(readLine) catch {case _: Exception => None}
5 }
6 case PutLine(n,l) => Future.unit{
7 println(l)
8 }
9 }
10 }

从以上的讨论我们得出:IO类型可以用Free类型代替,这样我们可以充分利用Free Monad的Monadic编程语言编写IO程序,我们又可以分开考虑编写可能产生IO副作用的Interpreter。