Akka-Cluster(4)- DistributedData, 分布式数据类型
在实际应用中,集群环境里共用一些数据是不可避免的。我的意思是有些数据可以在任何节点进行共享同步读写,困难的是如何解决更改冲突问题。本来可以通过分布式数据库来实现这样的功能,但使用和维护成本又过高,不值得。分布式数据类型distributed-data (ddata)正是为解决这样的困局而设计的。akka提供了一组CRDT(ConflictFreeReplicatedDataType 免冲突可复制数据类型)和一套管理方法来实现分布式数据在集群中的免冲突共享共用。
akka提供的分布式数据共享管理方案是通过replicator来实现的。replicator就是一种actor, 在集群的每一个节点运行replicator后,各节点相同actor路径(去掉地址信息后)的replicator可以通过gissip协议进行沟通,仿佛连接成一个replicator网络通道。replicator提供一套解决数据更新冲突及数据同步的api。首先,共享数据结构是在各节点的replicator中构建的,数据更新时各节点程序把包嵌共享数据类型指定和对该数据更新方法函数的消息发送给本节点的replicator去更新并通过gossip协议向其它节点的replicator同步,同时解决同步时发生的冲突问题。由于数据是存在于replicator内的,所以数据值的读取同样是通过向本地replicator发送数据读取消息实现的。
replicator作为一个actor,可以通过在.conf文件中定义akka-cluster-ddata-DistributedData扩展来启动,又或者直接通过replicator.prop构建。个人认为直接构建actor会灵活许多,而且可以在一个节点上构建多个replicator,因为不同节点上的replicator是通过actor路径来分群组的。下面是通过replicator.prop构建replicator的示范代码:
复制代码
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second)), "replicator")
复制代码
如果使用配置文件中的akka.extension 进行构建:
复制代码
akka {
extensions = ["akka.cluster.ddata.DistributedData"]
...
}
val replicator = DistributedData(context.system).replicator
复制代码
CRDT是某种key,value数据类型。CRDT value主要包括Counter,Flag,Set,Map几种类型,包括:
复制代码
/**
* Implements a boolean flag CRDT that is initialized to `false` and
* can be switched to `true`. `true` wins over `false` in merge.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final case class Flag(enabled: Boolean)
final case class FlagKey(_id: String)
/**
* Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
* A G-Counter is a increment-only counter (inspired by vector clocks) in
* which only increment and merge are possible. Incrementing the counter
* adds 1 to the count for the current node. Divergent histories are
* resolved by taking the maximum count for each node (like a vector
* clock merge). The value of the counter is the sum of all node counts.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class GCounter
final case class GCounterKey(_id: String)
/**
* Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
* PN-Counters allow the counter to be incremented by tracking the
* increments (P) separate from the decrements (N). Both P and N are represented
* as two internal [[GCounter]]s. Merge is handled by merging the internal P and N
* counters. The value of the counter is the value of the P counter minus
* the value of the N counter.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class PNCounter
final case class PNCounterKey(_id: String)
/**
* Implements a 'Add Set' CRDT, also called a 'G-Set'. You can't
* remove elements of a G-Set.
* A G-Set doesn't accumulate any garbage apart from the elements themselves.
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final case class GSet[A]
final case class GSetKey[A](_id: String)
/**
* Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'.
* Elements can be added and removed any number of times. Concurrent add wins
* over remove.
*
* The ORSet has a version vector that is incremented when an element is added to
* the set. The `node -> count` pair for that increment is stored against the
* element as its "birth dot". Every time the element is re-added to the set,
* its "birth dot" is updated to that of the `node -> count` version vector entry
* resulting from the add. When an element is removed, we simply drop it, no tombstones.
*
* When an element exists in replica A and not replica B, is it because A added
* it and B has not yet seen that, or that B removed it and A has not yet seen that?
* In this implementation we compare the `dot` of the present element to the version vector
* in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
* that means the other set has yet to see this add, and the item is in the merged
* Set. If the Set version vector dominates the dot, that means the other Set has removed this
* element already, and the item is not in the merged Set.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class ORSet[A]
final case class ORSetKey[A](_id: String)
/**
* Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
*
* It has similar semantics as an [[ORSet]], but in case of concurrent updates
* the values are merged, and must therefore be [[ReplicatedData]] types themselves.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class ORMap[A, B <: ReplicatedData]
final case class ORMapKey[A, B <: ReplicatedData](_id: String)
/**
* An immutable multi-map implementation. This class wraps an
* [[ORMap]] with an [[ORSet]] for the map's value.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class ORMultiMap[A, B]
final case class ORMultiMapKey[A, B](_id: String)
/**
* Map of named counters. Specialized [[ORMap]] with [[PNCounter]] values.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class PNCounterMap[A]
final case class PNCounterMapKey[A](_id: String)
复制代码
综合统计,akka提供现成的CRDT类型包括:
Counters: GCounter, PNCounter
Sets: GSet, ORSet
Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
Registers: LWWRegister, Flag
CRDT操作结果也可以通过订阅方式获取。用户发送Subscribe消息给replicator订阅有关Key[A]数据的操作结果:
复制代码
/**
* Register a subscriber that will be notified with a [[Changed]] message
* when the value of the given `key` is changed. Current value is also
* sent as a [[Changed]] message to a new subscriber.
*
* Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
* and it is also possible to send an explicit `FlushChanges` message to
* the `Replicator` to notify the subscribers immediately.
*
* The subscriber will automatically be unregistered if it is terminated.
*
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
data.asInstanceOf[T]
}
/**
* The data value. Use [[#get]] to get the fully typed value.
*/
def dataValue: A = data
}
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded {
override def toString: String = s"Deleted [$key]"
}
复制代码
replicator完成操作后发布topic为Key[A]的Changed, Deleted消息。
分布式数据读写是通过发送消息给本地的replicator来实现的。读写消息包括Update,Get,Delete。读取数据用Get,也可以订阅CRDT的更新状态消息Changed, Deleted。
赋予CRDT复制和免冲突特性的应该是replicator对Update这个消息的处理方式。Update消息的构建代码如下:
复制代码
final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,request: Option[Any])(val modify: Option[A] ⇒ A)
extends Command[A] with NoSerializationVerificationNeeded {...}
def apply[A <: ReplicatedData](
key: Key[A], initial: A, writeConsistency: WriteConsistency,
request: Option[Any] = None)(modify: A ⇒ A): Update[A] =
Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = {
case Some(data) ⇒ modify(data)
case None ⇒ modify(initial)
}
复制代码
我们看到在Update类型里包嵌了数据标示Key[A]和一个函数modify: Option[A] => A。replicator会用这个modify函数来对CRDT数据A进行转换处理。构建器函数apply还包括了A类型数据的初始值,在第一次引用这个数据时就用initial这个初始值,这个从modifyWithInitial函数和它在apply里的引用可以了解。下面是这个Update消息的使用示范:
复制代码
val timeout = 3.seconds.dilated
val KeyA = GCounterKey("A")
val KeyB = ORSetKey[String]("B")
val KeyC = PNCounterMapKey[String]("C")
val KeyD = ORMultiMapKey[String, String]("D")
val KeyE = ORMapKey[String, GSet[String]]("E")
replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3)
replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")
replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }
replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" → Set("A")) }
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" → GSet.empty[String].add("A")) }
复制代码
由于CRDT数据读写是通过消息发送形式实现的,读写结果也是通过消息形式返回的。数据读取返回消息里包嵌了结果数据。下面就是读写返回结果消息类型:
复制代码
/*------------------UPDATE STATE MESSAGES -----------*/
final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends UpdateResponse[A] with DeadLetterSuppression
sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A]
/**
* The direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] and
* [[WriteConsistency#timeout timeout]].
*
* The `Update` was still performed locally and possibly replicated to some nodes.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A]
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
*/
final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Option[Any])
extends UpdateFailure[A] {
override def toString: String = s"ModifyFailure [$key]: $errorMessage"
}
/**
* The local store or direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] due to durable store errors. This is
* only used for entries that have been configured to be durable.
*
* The `Update` was still performed in memory locally and possibly replicated to some nodes,
* but it might not have been written to durable storage.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends UpdateFailure[A] with DeleteResponse[A] {
/* ---------------- GET MESSAGES --------*/
/**
* Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
*/
final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])(data: A)
extends GetResponse[A] with ReplicatorMessage {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
data.asInstanceOf[T]
}
/**
* The data value. Use [[#get]] to get the fully typed value.
*/
def dataValue: A = data
}
final case class NotFound[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends GetResponse[A] with ReplicatorMessage
/*----------------DELETE MESSAGES ---------*/
final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends RuntimeException with NoStackTrace with DeleteResponse[A] {
override def toString: String = s"DataDeleted [$key]"
}
复制代码
读取返回消息中定义了数据读取方法def dataValue: A 获取数据,或者用类型方法get(Key[A])指定读取目标。下面是一些数据读取例子:
复制代码
val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")
replicator ! Get(Counter1Key, ReadLocal)
val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
replicator ! Get(Set1Key, readFrom3)
val readMajority = ReadMajority(timeout = 5.seconds)
replicator ! Get(Set2Key, readMajority)
val readAll = ReadAll(timeout = 5.seconds)
replicator ! Get(ActiveFlagKey, readAll)
case g @ GetSuccess(Counter1Key, req) ⇒
val value = g.get(Counter1Key).value
case NotFound(Counter1Key, req) ⇒ // key counter1 does not exist
...
case g @ GetSuccess(Set1Key, req) ⇒
val elements = g.get(Set1Key).elements
case GetFailure(Set1Key, req) ⇒
// read from 3 nodes failed within 1.second
case NotFound(Set1Key, req) ⇒ // key set1 does not exist
/*---- return get result to user (sender()) ----*/
case "get-count" ⇒
// incoming request to retrieve current value of the counter
replicator ! Get(Counter1Key, readTwo, r