Akka-Cluster(5)- load-balancing with backoff-supervised stateless computation - 无状态任务集群节点均衡分配
分布式程序运算是一种水平扩展(scale-out)运算模式,其核心思想是能够充分利用服务器集群中每个服务器节点的计算资源,包括:CPU、内存、硬盘、IO总线等。首先对计算任务进行分割,然后把细分的任务分派给各节点去运算。细分的任务相互之间可以有关联或者各自为独立运算,使用akka-cluster可以把任务按照各节点运算资源的负载情况进行均匀的分配,从而达到资源的合理充分利用以实现运算效率最大化的目的。如果一项工作可以被分割成多个独立的运算任务,那么我们只需要关注如何合理地对细分任务进行分配以实现集群节点的负载均衡,这实际上是一种对无需维护内部状态的运算任务的分配方式:fire and forget。由于承担运算任务的目标actor具体的部署位置是由算法决定的,所以我们一般不需要控制指定的actor或者读取它的内部状态。当然,如果需要的话我们还是可以通过嵌入消息的方式来实现这样的功能。
集群节点负载均衡是一种任务中央分配方式,其实是在集群环境下的router/routees运算模式,只是现在的router可以把任务发送给跨服务器上的actor。当然,任务分派是通过算法实现的,包括所有普通router的routing算法如:round-robin, random等等。 akka提供了一种基于节点运算资源负载的算法,在配置文件中定义:
akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
下面的例子可以提供metrics基本作用的解释:
复制代码
akka.actor.deployment {
/frontend/dispatcher = {
# Router type provided by metrics extension.
router = cluster-metrics-adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
routees.paths = ["/user/backend"]
cluster {
enabled = on
use-role = backend
allow-local-routees = off
}
}
}
复制代码
dispatcher代表router, backend/目录下的actor代表routees。
假如我们把一个大型的数据处理程序分割成多个独立的数据库操作。为了保证每项操作都能在任何情况下安全进行,包括出现异常,我们可以用BackoffSupervisor来支持负责操作的actor,如下:
复制代码
val supervisor = BackoffSupervisor.props(
Backoff.onFailure( // Backoff.OnStop
childProps = workerProps(client),
childName = "worker",
minBackoff = 1 second,
maxBackoff = 10 seconds,
randomFactor = 0.20
).withAutoReset(resetBackoff = 5 seconds)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
)
复制代码
在这里要特别注明一下Backoff.OnFailure和Backoff.OnStop的使用场景和作用,这部分与官方文档有些出入。首先,这两种方法都不会造成childActor的重启动作(restart),而是重新创建并启动一个新的实例。具体情况请参考下面测试程序的输出:
复制代码
package my.akka
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import akka.pattern.{Backoff, BackoffSupervisor, ask}
import scala.concurrent.Await
import scala.concurrent.duration._
class Child extends Actor {
println(s"[Child]: created. (path = ${this.self.path}, instance = ${this})")
override def preStart(): Unit = {
println(s"[Child]: preStart called. (path = ${this.self.path}, instance = ${this})")
super.preStart()
}
override def postStop(): Unit = {
println(s"[Child]: postStop called. (path = ${this.self.path}, instance = ${this})")
super.postStop()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
println(s"[Child]: preRestart called with ($reason, $message). (path = ${this.self.path}, instance = ${this})")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
println(s"[Child]: postRestart called with ($reason). (path = ${this.self.path}, instance = ${this})")
super.postRestart(reason)
}
def receive = {
case "boom" =>
throw new Exception("kaboom")
case "get ref" =>
sender() ! self
case a: Any =>
println(s"[Child]: received ${a}")
}
}
object Child {
def props: Props
= Props(new Child)
def backOffOnFailureProps: Props
= BackoffSupervisor.props(
Backoff.onFailure(
Child.props,
childName = "myEcho",
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
))
def backOffOnStopProps: Props
= BackoffSupervisor.props(
Backoff.onStop(
Child.props,
childName = "myEcho",
minBackoff = 1.seconds,
maxBackoff = 10.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
))
}
object BackoffSuperVisorApp {
def defaultSuperVisorCase(): Unit = {
println(
"""
|default ---------------------------
""".stripMargin)
val system = ActorSystem("app")
try{
/**
* Let's see if "hello" message is received by the child
*/
val child = system.actorOf(Child.props, "child")
Thread.sleep(100)
child ! "hello"
//[Child]: received hello
/**
* Now restart the child with an exception within its receive method
* and see if the `child` ActorRef is still valid (i.e. ActorRef incarnation remains same)
*/
child ! "boom"
Thread.sleep(200)
child ! "hello after normal exception"
//[Child]: received hello after normal exception
/**
* PoisonPill causes the child actor to `Stop`, different from restart.
* The ActorRef incarnation gets updated.
*/
child ! PoisonPill
Thread.sleep(200)
/**
* This causes delivery to deadLetter, since the "incarnation" of ActorRef `child` became obsolete
* after child is "Stopped"
*
* An incarnation is tied to an ActorRef (NOT to its internal actor instance)
* and the same incarnation means "you can keep using the same ActorRef"
*/
child ! "hello after PoisonPill"
// [akka://app/user/parent/child-1] Message [java.lang.String] without sender to Actor[akka://app/user/child#-767539042]
// was not delivered. [1] dead letters encountered.
Thread.sleep(200)
}
finally{
system.terminate()
Thread.sleep(500)
}
}
def backOffOnStopCase(): Unit ={
println(
"""
|backoff onStop ---------------------------
""".stripMargin)
val system = ActorSystem("app")
try{
/**
* Let's see if "hello" message is forwarded to the child
* by the backoff supervisor onStop
*/
implicit val futureTimeout: akka.util.Timeout = 1.second
val backoffSupervisorActor = system.actorOf(Child.backOffOnStopProps, "child")
Thread.sleep(100)
backoffSupervisorActor ! "hello to backoff supervisor" //forwarded to child
//[Child]: received hello to backoff supervisor
/**
* Now "Restart" the child with an exception from its receive method.
* As with the default supervisory strategy, the `child` ActorRef remains valid. (i.e. incarnation kept same)
*/
val child = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
child ! "boom"
Thread.sleep(2000)
child ! "hello to child after normal exception"
//[Child]: received hello to child after normal exception
/**
* Backoff Supervisor can still forward the message
*/
backoffSupervisorActor ! "hello to backoffSupervisorActor after normal exception"
//[Child]: received hello to backoffSupervisorActor after normal exception
Thread.sleep(200)
/**
* PoisonPill causes the child actor to `Stop`, different from restart.
* The `child` ActorRef incarnation gets updated.
*/
child ! PoisonPill
Thread.sleep(2000)
child ! "hello to child ref after PoisonPill"
//delivered to deadLetters
/**
* Backoff Supervisor can forward the message to its child with the new incarnation
*/
backoffSupervisorActor ! "hello to backoffSupervisorActor after PoisonPill"
//[Child]: received hello to backoffSupervisorActor after PoisonPill
Thread.sleep(200)
}
finally{
system.terminate()
Thread.sleep(500)
}
}
def backOffOnFailureCase(): Unit ={
println(
"""
|backoff onFailure ---------------------------
""".stripMargin)
val system = ActorSystem("app")
try{
/**
* Let's see if "hello" message is forwarded to the child
* by the backoff supervisor onFailure
*/
implicit val futureTimeout: akka.util.Timeout = 1.second
val backoffSupervisorActor = system.actorOf(Child.backOffOnFailureProps, "child")
Thread.sleep(100)
backoffSupervisorActor ! "hello to backoff supervisor" //forwarded to child
//[Child]: received hello to backoff supervisor
/**
* Now "Stop" the child with an exception from its receive method.
* You'll see the difference between "Restart" and "Stop" from here:
*/
val child = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
child ! "boom"
Thread.sleep(2000)
/**
* Note that this is after normal exception, not after PoisonPill,
* but child is completely "Stopped" and its ActorRef "incarnation" became obsolete
*
* So, the message to the `child` ActorRef is delivered to deadLetters
*/
child ! "hello to child after normal exception"
//causes delivery to deadLetter
/**
* Backoff Supervisor can still forward the message to the new child ActorRef incarnation
*/
backoffSupervisorActor ! "hello to backoffSupervisorActor after normal exception"
//[Child]: received hello to backoffSupervisorActor after normal exception
/**
* You can get a new ActorRef which represents the new incarnation
*/
val newChildRef = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
newChildRef ! "hello to new child ref after normal exception"
//[Child]: received hello to new child ref after normal exception
Thread.sleep(200)
/**
* No matter whether the supervisory strategy is default or backoff,
* PoisonPill causes the actor to "Stop", not "Restart"
*/
newChildRef ! PoisonPill
Thread.sleep(3000)
newChildRef ! "hello to new child ref after PoisonPill"
//delivered to deadLetters
Thread.sleep(200)
}
finally{
system.terminate()
Thread.sleep(500)
}
}
def main(args: Array[String]): Unit ={
defaultSuperVisorCase()
backOffOnStopCase()
backOffOnFailureCase()
}
}
复制代码
OnStop:不响应child-actor发生的异常,采用SupervisorStrategy异常处理方式。对正常停止动作,如PoisonPill, context.stop作用:重新构建新的实例并启动。
OnFailure:不响应child-actor正常停止,任其终止。发生异常时重新构建新的实例并启动。
很明显,通常我们需要在运算发生异常时重新启动运算,所以用OnFailure才是正确的选择。
下面是我之前介绍关于BackoffSupervisor时用的一个例子的代码示范:
复制代码
package backoffSupervisorDemo
import akka.actor._
import akka.pattern._
import backoffSupervisorDemo.InnerChild.TestMessage
import scala.concurrent.duration._
object InnerChild {
case class TestMessage(msg: String)
class ChildException extends Exception
def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
import InnerChild._
override def receive: Receive = {
case TestMessage(msg) => //模拟子级功能
log.info(s"Child received message: ${msg}")
}
}
object Supervisor {
def props: Props = { //在这里定义了监管策略和child Actor构建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: InnerChild.ChildException => SupervisorStrategy.Restart
}
val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
.withManualReset
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
}
//注意:下面是Supervisor的父级,不是InnerChild的父级
object ParentalActor {
case class SendToSupervisor(msg: InnerChild.TestMessage)
case class SendToInnerChild(msg: InnerChild.TestMessage)
case class SendToChildSelection(msg: InnerChild.TestMessage)
def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
import ParentalActor._
//在这里构建子级Actor supervisor
val supervisor = context.actorOf(Supervisor.props,"supervisor")
supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
var innerChild: Option[ActorRef] = None //返回的当前子级ActorRef
val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
override def receive: Receive = {
case BackoffSupervisor.CurrentChild(ref) => //收到子级Actor信息
innerChild = ref
case SendToSupervisor(msg) => supervisor ! msg
case SendToChildSelection(msg) => selectedChild ! msg
case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
}
}
object BackoffSupervisorDemo extends App {
import ParentalActor._
val testSystem = ActorSystem("testSystem")
val parent = testSystem.actorOf(ParentalActor.props,"parent")
Thread.sleep(1000) //wait for BackoffSupervisor.CurrentChild(ref) received
parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
scala.io.StdIn.readLine()
testSystem.terminate()
}
复制代码
好了,现在我们就开始实现一个在集群中进行数据库操作的例子,看看akka-cluster是如何把一串操作分派给各节点上去操作的。首先是这个Worker:
复制代码
import akka.actor._
import scala.concurrent.duration._
object Backend {
case class SaveFormula(op1: Int, op2: Int)
def workerProps = Props(new Worker)
}
class Worker extends Actor with ActorLogging {
import Backend._
context.setReceiveTimeout(500 milliseconds)
override def receive: Receive = {
case SaveFormula(op1,op2) => {
val res = op1 * op2
// saveToDB(op1,op2,res)
log.info(s"******* $op1 X $op2 = $res save to DB by $self *******")
}
case ReceiveTimeout =>
log.info(s"******* $self receive timout! *******")
throw ne