Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存写端操作方式

上篇我们提到CQRS是一种读写分离式高并发、大流量数据录入体系,其中存写部分是通过event-sourcing+akka-persistence实现的。也可以这样理解:event-sourcing(事件源)是一种特殊数据录入模式,akka-persistence是这种模式的具体实现方式。事件源的核心思想是把某写发生的事件写入log(journal)。这些事件是能成功改变系统状态,并且时已经发生的事情。一开始我常常把事件源和命令源(command-sourcing)混为一谈。它们根本的区别事件event是已经发生的,命令command是待发生的。如果我们把命令存入journal,在对journal里的记录进行重新演算(replay)时就会执行命令并产生一些副作,如显示打印、发email等等。而系统状态和环境随着时间在不断变化,这些副作用也会在不同的时间产生不同的影响,这肯定是我们不想看见的。 事件源模式中,在内存里保存能代表程序状态的对象state-objects,这些状态对象与数据库表model之间建立了对应关系。假设程序中支持某些指令command,它们会改变程序的状态并且还可能还会产生一些副作用,那么用事件源做法的操作顺序应该是:产生副作用->存写事件->改变内存里的状态对象。其中任何一个环节失败都会放弃下面的环节。另一方面,在用journal中记录进行重新演算时,就需要先把发生的事件还原成改变状态的命令,人为的免去副作用,因为它已经在正确的时间产生过了,然后只要更新数据库model状态就算完成了。所以,实现persistence包括object和model之间对应、state-objects维护方式以及command和event之间的转换。 首先分析一下command与event之间的转换:我们还是用上一篇的POS收银系统做示范。下面是几个收银操作指令: 复制代码 case class GetItemInfo(itemcode: String) extends Command case class AddItem(item: Item, qty: Int) extends Command case class AliPay(amount: Double) extends Command 复制代码 上面三个典型command可以下面的方式转换成event: GetItemInfo:这是一个查询商品资料的指令,不影响交易状态,不必转换 AddItem: 这个指令只影响交易状态,没有副作用,转换成 :ItemAdded(item: Item, qty: Int) extends Event AliPay:改变交易状态并且产生副作用,因为要即时从支付宝扣款。做法:先确定支付成功,然后转成: AliPaid(amount Double) extends Event 复制代码 case class ItemAdded(item: Item, qty: Int) extends Event case class AliPaid(amount: Double) extends Event 复制代码 POS收银交易状态是一张未结算账单内容,是个简单的交易记录清单SalesMemo: 复制代码 //交易记录 case class TxnItem( num: Int //销售单号 ,seq: Int //交易序号 ,txntype: Int //交易类型编号 ,code: String //编号(商品、账号...) ,qty: Int //交易数量 ,price: Int //单价(分) ,amount: Int //金额(分) ) case class SalesMemo(salesnum: Int, txnitems: List[TxnItem] = Nil) { def itemAdded(evt: Event): SalesMemo = evt match { case ItemAdded(item,qty) => copy(txnitems = TxnItem(salesnum, txnitems.length+1,0,item.code,qty,item.price,qty * item.price) :: txnitems) case _ => this } def aliPaid(evt: Event) = evt match { case AliPaid(amt) => copy(txnitems = TxnItem(salesnum,txnitems.length+1,0,'ali',1,amt,amt) :: items) case _ => this } } 复制代码 itemAdded,aliPaid这两个函数分别代表AddItem和AliPay对状态对象的转变处理。 上面提到persistenceActor存写journal时对事件发生的顺序有严格要求,否则无法实现读取端正确恢复原始状态。这项要求的实现是通过persist/persistAsync这两种函数来实现的。下面是这几类函数的款式: 复制代码 //无干扰存写,后面进来的消息先存放在内部的临时存放点 message-stashing def persist[A](event: A)(handler: A ⇒ Unit): Unit = { internalPersist(event)(handler) } //同时存写多个事件 def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { internalPersistAll(events)(handler) } //异步存写事件,没有临时存放点机制 no-message-stashing def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { internalPersistAsync(event)(handler) } //异步存写多项事件 def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { internalPersistAllAsync(events)(handler) } //不存写事件,利用内部临时存放点机制来保证handler执行顺序 def defer[A](event: A)(handler: A ⇒ Unit): Unit = { internalDefer(event)(handler) } //不存写事件,只保证handler运行顺序 def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { internalDeferAsync(event)(handler) } 复制代码 无论如何,handler函数都保证在事件存写动作成功后才能运行。我们用一些伪代码来示范有临存stash及无临存no-stash时handler运行的顺序: 复制代码 override def receiveCommand: Receive = { case c: String ⇒ { sender() ! c persist(s"evt-$c-1") { e ⇒ sender() ! e } persist(s"evt-$c-2") { e ⇒ sender() ! e } defer(s"evt-$c-3") { e ⇒ sender() ! e } } } //有内部临存 with message stashing persistentActor ! "a" persistentActor ! "b" // order of received messages: // a // evt-a-1 // evt-a-2 // evt-a-3 // b // evt-b-1 // evt-b-2 // evt-b-3 ---------------------------------- override def receiveCommand: Receive = { case c: String ⇒ { sender() ! c persistAsync(s"evt-$c-1") { e ⇒ sender() ! e } persistAsync(s"evt-$c-2") { e ⇒ sender() ! e } deferAsync(s"evt-$c-3") { e ⇒ sender() ! e } } } persistentActor ! "a" persistentActor ! "b" // order of received messages: // a // b //无临存机制,外部信息立即处理了 // evt-a-1 // evt-a-2 // evt-a-3 // evt-b-1 // evt-b-2 // evt-b-3 复制代码 如果发生内嵌多层persist时,正确的顺序如下: 复制代码 override def receiveCommand: Receive = { case c: String ⇒ sender() ! c persist(s"$c-1-outer") { outer1 ⇒ sender() ! outer1 persist(s"$c-1-inner") { inner1 ⇒ sender() ! inner1 } } persist(s"$c-2-outer") { outer2 ⇒ sender() ! outer2 persist(s"$c-2-inner") { inner2 ⇒ sender() ! inner2 } } } persistentActor ! "a" persistentActor ! "b" // order of received messages: // a // a-outer-1 // a-outer-2 // a-inner-1 // a-inner-2 // and only then process "b" // b // b-outer-1 // b-outer-2 // b-inner-1 // b-inner-2 -------------------------------- override def receiveCommand: Receive = { case c: String ⇒ sender() ! c persistAsync(c + "-outer-1") { outer ⇒ sender() ! outer persistAsync(c + "-inner-1") { inner ⇒ sender() ! inner } } persistAsync(c + "-outer-2") { outer ⇒ sender() ! outer persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner } } } persistentActor ! "a" persistentActor ! "b" // order of received messages: // a // b // a-outer-1 // a-outer-2 // b-outer-1 // b-outer-2 // a-inner-1 // a-inner-2 // b-inner-1 // b-inner-2 // which can be seen as the following causal relationship: // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2 // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2 复制代码 值得注意的是这个handler函数只会在事件存写成功后才运行,失败则否。也就是说确认了事件已经安全存写后才更新state-objects状态(model状态在CQRS读取时再相应更新)。针对上面的POS例子里可以用下面的代码处理方式: 复制代码 override def receiveCommand: Receive = { case AddItem(item,qty) => persist(ItemAdded(item,qty))(salesMemo.itemAdded) case AliPay(amt) => try { if (aliOnlinePay(amt)) //先产生副作用 persist(AliPaid(amt))(salesMemo.alipaid(_)) } catch { case _ > Throw new OnlinePayExecption("boom!!!") } ... 复制代码 akka-persistence代表CQRS模式中以事件源方式存写数据的具体实现。我们提到过,数据存写具体做法是向一个journal里写入发生的改变状态目标state-objects的事件。每次PersistenceActor启动时都会从journal里读取之前写入的事件、还原成指令command、然后逐步把state-objects恢复到上次停止时的状态,不管是因异常还是正常停止的。这个恢复状态的过程是由PersistenceActor的receiveRecovery函数实现的,如下: 复制代码 override def receiveRecover: Receive = { case evt: Event => salesMemo = salesMemo.updateMemo(evt) case SnapshotOffer(_,loggedItems: SalesMemo) => salesMemo = loggedItems } 复制代码 按理来说恢复状态即是把事件从头到尾再演算一遍。不过这种方式效率是个大问题,试想每次启动都需要先读取几十万条数据会是怎样的感受。效率问题的解决方法就是通过存写快照方式把之前的事件总结成快照snapshot形式的阶段状态,然后存入快照库(snapshot-store)。这样在PersistenceActor启动时先用最后一个快照把状态恢复到一个阶段,然后再读取快照产生之后的所有事件对阶段性状态再转换成最新状态。快照的读写函数如下: 复制代码 def saveSnapshot(snapshot: Any): Unit = { snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot) } /** * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received * before any further replayed messages. */ @SerialVersionUID(1L) final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) /** * Snapshot metadata. * * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. */ @SerialVersionUID(1L) //#snapshot-metadata final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) //#snapshot-metadata 复制代码 PersistenceActor里有receiveRecover和receiveCommand两个抽象函数,必须由用户提供具体的实现。这两个函数代表了PersistentActor的两大功能:状态复原和消息处理。状态复原是通过receiveRecover对snapshot-store和journal里的记录处理实现的。而PersistentActor的receiveCommand就是普通Actor的receive消息处理函数。用户可以通过PersistentActor提供的回调(callback)函数来进行事件读取过程前的事前准备和后面的事后处理。可以对这些callback函数进行重载(override)来自定义这些处理程序,如: 复制代码 /** * Called whenever a message replay fails. By default it logs the error. * * Subclass may override to customize logging. * * The actor is always stopped after this method has been invoked. * * @param cause failure cause. * @param event the event that was processed in `receiveRecover`, if the exception * was thrown there */ protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = ... /** * Called when persist fails. By default it logs the error. * Subclass may override to customize logging and for example send negative * acknowledgment to sender. * * The actor is always stopped after this method has been invoked. * * Note that the event may or may not have been saved, depending on the type of * failure. * * @param cause failure cause. * @param event the event that was to be persisted */ protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { ... /** * Called when the journal rejected `persist` of an event. The event was not * stored. By default this method logs the problem as a warning, and the actor continues. * The callback handler that was passed to the `persist` method will not be invoked. * * @param cause failure cause * @param event the event that was to be persisted */ protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { ... 复制代码 也可以通过函数重载来自定义状态恢复行为: 复制代码 trait PersistenceRecovery { //#persistence-recovery /** * Called when the persistent actor is started for the first time. * The returned [[Recovery]] object defines how the Actor will recover its persistent state before * handling the first incoming message. * * To skip recovery completely return `Recovery.none`. */ def recovery: Recovery = Recovery() //#persistence-recovery } 复制代码 整个状态恢复过程是在EventSourced.scala里下面这个函数实现的: 复制代码 override def stateReceive(receive: Receive, message: Any) = try message match { case ReplayedMessage(p) ⇒ try { eventSeenInInterval = true updateLastSequenceNr(p) Eventsourced.super.aroundReceive(recoveryBehavior, p) } catch { case NonFatal(t) ⇒ timeoutCancellable.cancel() try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) returnRecoveryPermit() } case RecoverySuccess(highestSeqNr) ⇒ timeoutCancellable.cancel() onReplaySuccess() // callback for subclass implementation sequenceNr = highestSeqNr setLastSequenceNr(highestSeqNr) _recoveryRunning = false try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) finally transitToProcessingState() case ReplayMessagesFailure(cause) ⇒ timeoutCancellable.cancel() try onRecoveryFailure(cause, event = None) finally context.stop(self) case RecoveryTick(false) if !eventSeenInInterval ⇒ timeoutCancellable.cancel() try onRecoveryFailure( new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"), event = None) finally context.stop(self) case RecoveryTick(false) ⇒ eventSeenInInterval = false case RecoveryTick(true) ⇒ // snapshot tick, ignore case other ⇒ stashInternally(other) } catch { case NonFatal(e) ⇒ returnRecoveryPermit() throw e } 复制代码 函数通过super.aroundReceive把消息传给了receiveRecovery: 复制代码 /** * INTERNAL API. * * Can be overridden to intercept calls to this actor's current behavior. * * @param receive current behavior. * @param msg current message. */ protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { // optimization: avoid allocation of lambda if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) { unhandled(msg) } } 复制代码 因为EventSourced继承了PersistenceRecovery trait,所以重载recovery函数可以改变状态恢复行为。默认的模式是: 复制代码 /** * Recovery mode configuration object to be returned in [[PersistentActor#recovery]]. * * By default recovers from latest snapshot replays through to the last available event (last sequenceId). * * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots * and at least one of these snapshots matches the specified `fromSnapshot` criteria. * Otherwise, recovery will start from scratch by replaying all stored events. * * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]] * message, followed by replayed messages, if any, that are younger than the snapshot, up to the * specified upper sequence number bound (`toSequenceNr`). * * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default * is latest (= youngest) snapshot. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound. * @param replayMax maximum number of messages to replay. Default is no limit. */ @SerialVersionUID(1L) final case class Recovery( fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, toSequenceNr: Long = Long.MaxValue, replayMax: Long = Long.MaxValue) 复制代码 下面是状态恢复过程中产生的消息: 复制代码 /** * Sent to a [[PersistentActor]] when the journal replay has been finished. */ @SerialVersionUID(1L) case object RecoveryCompleted extends RecoveryCompleted { ...
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信