Akka-Cluster(0)- 分布式应用开发的一些想法

 当我初接触akka-cluster的时候,我有一个梦想,希望能充分利用actor自由分布、独立运行的特性实现某种分布式程序。这种程序的计算任务可以进行人为的分割后再把细分的任务分派给分布在多个服务器上的actor上去运算。这些服务器都处于同一集群环境里,它们都是akka-cluster中的节点(node)。akka-cluster的节点数量只需要通过系统配置方式按照计算能力要求随意增减,在集群上运行的分布式程序可以在不修改软件的情况下自动调整actors在各节点上的分布,重新平衡程序运算负载,不受任何影响继续运行。

   在前面akka系列的博客里也介绍了一些akka-cluster的情况,最近在“集群环境内编程模式(PICE)”的专题系列里又讨论了如何在集群环境里通过protobuf-gRPC把多个不同类型的数据库服务集成起来。因为集群中的数据库服务是用akka-stream连接的,我们把程序与数据一起作为stream的流元素用Flow发送给相应的数据库服务进行处理。这时一个想法就产生了:当数据库服务接收了一项服务要求后(假设数据处理多是耗时、耗资源的任务)可以对任务进行分割,然后把这些小任务再分发给所属集群内的多个节点上去运算,再按计算要求收集,汇总结果。那么如果能按用户数量和运算任务的规模来任意添减服务器数量就能满足任何规模的运算需求了。最重要的是这种集群节点规模调整必须是某种配置方式,即通过修改配置文件,但不需要修改软件代码。这些需要恰恰又是akka-cluster的特殊能力。所以决定开个akka-cluster的专题系列来具体讨论集群环境下的分布式软件开发模式。

akka-cluster提供的以下几种方式比较符合我们的要求:

1、distributed pub/sub - 分布式发布订阅模式

2、cluster-singleton - 单例actor模式

3、cluster-load-balancing - 集群负载均衡模式

4、cluster-sharding - 集群分片模式

在这个系列下面的博客里我们会逐个模式讨论它们在具体编程的使用细节。但首先探讨一下如何通过配置文件来定义akka-cluster节点,实现集群规模调整。

集群节点(cluster node)的生命周期会经历以下阶段:

Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed

下面我们就用运行在不同集群节点的actor,通过订阅系统的集群成员状态转换消息来观察每个节点的状态转变:

复制代码
class EventListener extends Actor with ActorLogging {   import EventListner._    val cluster = Cluster(context.system)    override def preStart(): Unit = {     cluster.subscribe(subscriber = self,initialStateMode = InitialStateAsEvents     ,classOf[MemberEvent],classOf[UnreachableMember])     super.preStart()   }   override def postStop(): Unit = {     cluster.unsubscribe(self)     super.postStop()   }    override def receive: Receive = {     case MemberJoined(member) =>       log.info("{} is JOINING...", member.address)     case MemberUp(member) =>       log.info("{} is UP!", member.address)     case MemberWeaklyUp(member) =>       log.info("{} is weakly UP!", member.address)     case MemberLeft(member) =>       log.info("{} is LEAVING...", member.address)     case MemberExited(member) =>       log.info("{} is EXITING...", member.address)     case MemberRemoved(member, prevStatus) =>       log.info("{} is REMOVED! from state {}", member.address, prevStatus)     case UnreachableMember(member) =>       log.info("{} is UNREACHABLE!", member.address)     case ReachableMember(member) =>       log.info("{} is REACHABLE!", member.address)     case UnreachableDataCenter(datacenter) =>       log.info("Data Center {} is UNREACHABLE!", datacenter)     case ReachableDataCenter(datacenter) =>       log.info("Data Center {} is REACHABLE!", datacenter)     case Leave =>       cluster.leave(cluster.selfAddress)       log.info("{} is asked to leave cluster."

                    
                
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信