这是《Pinciples of Reactive Programming》week6的最后一课。
为什么需要把actor的状态持久化?
如果actor没有状态,那么在任何实时,这个actor的行为都是一致的。但是对于有状态的actor,其行为跟当前状态相关。所以当系统由于意外down掉以后,需要恢复系统的状态,意味着需要恢复actor的状态。
Actors representing a stateful resource
- shall not lose important state due to (system) failure
- must persist state as needed
- must recover state at (re)start
怎么记录actor的状态?
有两种途径来持久化actor的状态
Two possibilities for persisting state:
- in-place updates
- persist changes in append-only fashion
第一种方法
The first is to have the actor mirror a persistent storage location and do in-place updates of both. So when the actor's state changes, the persistent location is also updated. This persistent location could be files, in the file system. Or it could also be a record in a relational database.
第一种方法就是直接把当前的actor的状态持久化下来,成为actor状态的镜像。在更新actor的状态时,同时更新这个镜像。比如一个actor来处理kafka的消息,它的状态是它处理到了哪个topic的哪个partition的哪个offset.那么它就可以把这些状态记录在zookeeper中,每次处理完消息,就更新这些状态。就像Storm的Kafka-spout所做的一样,只不过那个不是actor。
第二种方法
The other way is to not persist the state itself and update it. But to persist the changes which are applied to the state. And this is doen in an append-only fashion, meaning that these change records will never be deleted. They will only be added to . The current state can then be derived by reapplying all changes from the beginning.
第二种方法就是不把当前的状态直接保存下来用以替代之前的状态,而且记录状态的变化。就是记录使状态A变化到状态B的动作。然后在恢复时,通过重复状态的所有变化过程,就可以得到想要恢复的状态。比如,如果一个actor是一个计数器,按照第一种方法,每次处理完消息,就更新保存在数据库或文件中的计数器的值,按照第二次方法,每次处理完消息,就记录说计数器的值加1。
这两种方法各有优劣。
对于第一种:
There are obvious benefits to persisting the state and doing in-place updates.
The first is that recovery of the latest state can be done in constant time, because you just need to go to that one memory location and read it back.
The other advantage is that the data of all you needed for storage depends only on the number of records and not on the rate of changes.
第一种方法可以在固定时间内恢复actor的状态,因为你只需要把状态从存储中读出来就够了。另一个好处是,它需要存储的数据量只和记录的个数有关,而与状态变化的速度无关。对于第二种方法,随着状态的变化,需要存储的数据量是持续增长的,而第一种则不一定会。
对于第二种方法:
But there are also benefits to persisting the changes.
For example, if you do that you can go back to any point in time and replay history, audit what happened in which order or restore a certain state. Say from last Thursday, because you need to either rerun what has happened or you need to discard all the changes which have been done since then.
During a replay, the code which handles the processing could also have, for example, been fixed, because it had a bug previously. And that means that errors which crept into the current state can be corrected retroactively. This is not possible if you only store the current state, because it will have the bug in it.
You all have seen the third advantage at work. For example, if you were shopping at a large shopping site one the Internet, which we all well know, if you look at the shopping cart and you put an item in. It is in the shopping cart. You might continue shopping, take it out, replace it by another one, and finally,onece you go to the checkout, the current contents of shopping cart is what you actually buy. If you only persist that, then the whole history is lost. But it might be very interesting to keep statistics. For example, this regrigerator has been replace in 50% of the cases by that other one, and people can then learn from other people's decisions. 0f course, these insights can also be used inside the company itself to organize their logistics processes. Storing all these events taks a lot of space, but space is comparactively cheap nowdays. And therefore, if profit can be made from analyzing these data, then it's well worth it.
The fourth advantage has to do with harware and how that works. If you write to an append-only stream, you can write a much higher bandwidth to IO, to network devices and aslo to hard disks.The reason is, that in-place updates need to at least appear to occur in exactly the order in which they were given, which limits the possibilities for optimization.
Finally, persisting immutable data has the advantages we have seen throughout the functional programming course. Anything which cannot possibly change can be freely shared and replicated. There is no need to synchronize acces, and whether you store an event stream to one, two, or three locatioons does not make a difference.
总结起来,第二种方法有以下优点:
- History can be replayed, audited or restored. 即,可以看到状态的变化过程,而不是只有结果,因此可以恢复到某个特定状态,或者对变化的过程进行审查。
- Some processing errors can be corrected retroactively. 一些处理过程中的错误可以被改正。
- Additional insight can be gained on bussiness processes. 比起in-place updates, 存储状态变化的方式可以看到更深入的内容,某些数据只在变化的过程中存在,但是却不反映在最终状态中,这些数据在使用只存储当前状态的方法时就无法获得
- Writing an append-only stream optimize IO bandwith。 往一个只读的输出流写入的速度更快。比如,可以把状态写入文件,那么每次更新状态,就得更新这个文件在特定位置的内容。如果记录变化,就可以把这个变化简单地append到用于记录的文件,这样速度会更快。(注,我认为这个不是绝对的,取决于具体的存储介质和存储状态的方式)
- Changes are immutable and can freely be replicated.状态的改变是不可变的,可以被随意地复制,而不用担心会冲突。(这个也是取决于具体情况)
当然这两种方法也可以综合起来,使用snapshot。就像HDFS的secondary namenode做的。它不仅记录change log,而且周期性地用change log生成当一个namenode状态的snapshot。这样就可以使得状态的恢复可以确定地在一个有限的时间内恢复。而且snapshot是不可变的,而且可以以append的方式记录(snapshot是把当前状态顺序写到磁盘(或其它存储),而不用更新这个snapshot,因此是append-only的),因此高效。
如何持久化状态的变化?
有两种方式
Command-Sourcing:
Persist the command before processing it, persist acknowledgement when processed
这种做法是把发要给actor的消息,也就是command, 在直正发给actor之前,先持久化。这样恢复的时候,重放之前被持久化的command就可以了。但这样存在一些问题,就是重放command时,相当于重新处理了一遍消息,在恢复的过程中,actor对外界的影响相当于是恢复过程产生的副作用。比如,如果actor在处理消息的过程中发给其它actor消息,那么在恢复过程中,他把之前发的消息相当于又发送了一遍,这就是一种副作用。对于这种在恢复过程中重复发送的消息,Akka有一种解决方案,就是使用channel(在2.3.4中,channel和persistentChannel被换成了AtLeastOnceDelivery)。channel会记下曾经发送过的消息,从而避免重复发送。
Event-Sourcing:
Generate change requests("events) instead of modifying local state; persist and apply them.
这种方法就是不直接把消息本身存下来,而是把消息引起的状态的变化保存下来,这个变化就仅仅是状态的变化。这样在恢复actor的状态时,就可以直接从log中取出状态的变化进行恢复,因此不会有重复发送消息这样的副作用。
When to Apply the Events?
在event-sourcing配图所示的方案里,events会先被发给log,log通常是一个actor,log把events持久化后,会replay这个events给actor,然后actor才会应用这个events,此时actor的状态才会改变。
但这样,也会存在一些问题,像下面这个例子。
下面的代码用来模拟一个博客网站,这个网站限制每个用户只能发送有限数量的blog,在代码中,这个数量被设为1.
sealed trait Event
case class PostCreated(text: String) extends Event
case object QuotaReached extends Event case class State(posts: Vector[String], disabled: Boolean) {
def updated(e: Event): State = e match{
case PostCreated(text) => copy(posts = posts :+ text)
case QuotaReached => copy(disabled = true)
}
} class UserProcessor extends Actor{
var state = State(Vector.empty[String], false)
def receive = {
case UserProcessor.NewPost(text) =>
if(!state.disabled)
emit(PostCreated(text), QuotaReached)
case e: Event =>
state = state.updated(e)
} def emit(events: Event*) = ...//send to log
}
object UserProcessor{
case class NewPost(text: String)
}
上面的代码中,event就代表状态的变化,就是event-sourcing中的"event", UserProcessor会处理用户提交blog的请求(NewPost),它会先判断用户发表blog的数量是否已达上限(if(!state.disabled)),如果没有就把event发给log。当回来log返回的event后,UserProcessor会用event改变自已的状态。
问题是,这个处理逻辑是有问题的。在于,UserProcessor收到NewPost后,并不会立即改变自身的状态,而是等到event被从log返回之后,才会改变状态。那么在emit event之后,收到被log返回的event之前,如果用户又发送了NewPost,虽然blog的上限为1,这个NewPost还是会被接受,因为UserProcessor的状态没有改变。
那么我们可以在持久化event之前,应用event。所以,对于何时apply events,有两种选择: apply after persisting, apply before persisting.
咋一看,第二种方法更好。但是让我们从另一个方面考虑下。在上例中,State的update方法在收到PostCreated时,会把新的blog文本加入到一个Vector中,于是vector增加了一个元素,这种改变我们认为是实际状态的改变。那么我们来看,当actor发生故障时,上述两种方法的不同。
在第一种方法中,如果一个blog已经被Posted,那么这种状态是一定可以被恢复的,因为引起状态改变的event已经被持久化了。
在第二种方法中,在event被持久化之前,event已经被用于改变actor的状态。所以,如果event被发给log之后,log把它持久化之前,UserProcessor处于一种“可能会丢失”的状态中。毕竟,如果blog在持久化event的过程中出了错,那么UserProcessor当前的状态就不能从blog中恢复了。
看起来,我们必须在正确的行为(能判断一个blog数量是否过多)和正确的persistent之前进行选择。但是,在上面这个例子上,还有第三种做法。
我们可以在处理完一条消息之后,不应用event,然后把event发给log。此时,actor处于等待状态中,它把新来的command缓存起来,先不进行处理,等待log对于第一条消息的回复,在log回复它之后,应用log回复的event进行状态的改变,然后再处理被缓存的command. 这样的坏处在于中间的等待会降低性能,好处是可以维持一致性。
Akka实际上内置了对这种形式地缓存的支持,叫做Stash
The Stash Trait
class UserProcessor extends Actor with Stash{
var state = State(Vector.empty[String], false)
def receive = {
case UserProcessor.NewPost(text) if !state.disabled=>
emit(PostCreated(text), QuotaReached)
context.become(waiting(2), discardOld=false)
} def waiting(n: Int): Receive = {
case e: Event =>
state = state.updated(e)
if(n == 1){context.unbecome(); unstashAll()}
else context.become(waiting(1))
case _ => stash()
}
}
在actor继承Stash这个trait以后,它可以使用stash()来缓存当前的消息,用unstash来恢复被恢存的消息。被恢复的消息不会被放在mailbox的最后,而是会放在前边(prepend而不是append to mailbox),以此来保持消息按照它们到达的顺序排列。
在上边的例子中,UserProcessor在收到NewPost后,会进行等待状态,等待两个event的到达,在此过程中其它消息会进入stash,等log回复的两个event应用于状态后,UserProcessor恢复到处理NewPost的状态,同时unstashAll缓存的消息。
When to Perform External Effects?
Peforming the effect and persisting that it was done cannot be atomic.
- Perform it before persisting for at-least-once semantics.
- Perform it after persisting for at-most-once semantics.
This choice needs to be made based on the underlying bussiness model.
前边说过,当actor与外部资源有交互时,恢复actor状态的过程就会更加复杂。根本原因在于,无法把外部事件和actor系统中相关的log做成atomic的。比如,在前边的例子中,假如发表blog要向银行发送请求,来收费,那么发送收费请求和记录这个请求到log不是atomic的,也就是说这两个事件可能只有一个成功。那么,问题来了,是应该先在log里记下已收费,然后再收费,还是先收费,再记下已收费?这个就取决于具体的业务模型了。
(注:但是实际上,从"把events记到log"到"应用events改变状态"之间,也有可能失败,所以,在actor down掉之后,想一定能恢复到之前的状态是不可能的。只能依靠幂等+at-least-once这种语法来保证系统能不受失败的影响)
Summary
- Actors can persist incoming messages or generated events.
- Events can be replicated and used to inform other components.
- Recovery replays past commands or events; snapshots reduce this cost
- Actors can defer handling certain messages by using the Stash trait.