菜鸟系列Fabric源码学习 — 区块同步
Fabric 1.4 源码分析 区块同步
本文主要从源码层面介绍fabric peer同步区块过程,peer同步区块主要有2个过程:
1)peer组织的leader与orderer同步区块
2)peer组织间peer同步区块。
1. peer leader和orderer同步区块
首先,orderer对外主要是broadcast和deliver两个服务orderer服务介绍。并且我们知道peer和orderer同步区块肯定是deliver服务实现的,但是到底是peer从orderer拉还是ordrer推送给peer呢?由于peer可以知道orderer信息(配置块)并且是grpc服务,则推断是peer从orderer拉区块。如果是拉区块,那么peer如何获取区块,获取区块的方式是什么?
1.1 Orderer Deliver服务
首先,查看orderer deliver服务是怎么运行的,是如何同步区块的。
当deliver服务被调用时,转到Handle()方法处理
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
...
for {
logger.Debugf("Attempting to read seek info message from %s", addr)
// 接受发来envelope
envelope, err := srv.Recv()
...
// 分发区块
status, err := h.deliverBlocks(ctx, srv, envelope)
...
}
}
其中,srv.Recv()接收envelope,在根据envelope信息分发block。
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
addr := util.ExtractRemoteAddress(ctx)
payload, err := utils.UnmarshalPayload(envelope.Payload)
if payload.Header == nil {
logger.Warningf("Malformed envelope received from %s with bad header", addr)
return cb.Status_BAD_REQUEST, nil
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
err = h.validateChannelHeader(ctx, chdr)
chain := h.ChainManager.GetChain(chdr.ChannelId)
defer func() {
labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
h.Metrics.RequestsCompleted.With(labels...).Add(1)
}()
accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
if err != nil {
logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
return cb.Status_BAD_REQUEST, nil
}
if err := accessControl.Evaluate(); err != nil {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
return cb.Status_FORBIDDEN, nil
}
seekInfo := &ab.SeekInfo{}
// 返回迭代器及起始区块号
cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return cb.Status_BAD_REQUEST, nil
}
}
for {
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return cb.Status_NOT_FOUND, nil
}
}
var block *cb.Block
var status cb.Status
iterCh := make(chan struct{})
go func() {
// 获取区块
block, status = cursor.Next()
close(iterCh)
}()
select {
case <-ctx.Done():
logger.Debugf("Context canceled, aborting wait for next block")
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
case <-erroredChan:
// TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports
// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
return cb.Status_SERVICE_UNAVAILABLE, nil
case <-iterCh:
// Iterator has set the block and status vars
}
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return status, nil
}
// increment block number to support FAIL_IF_NOT_READY deliver behavior
number++
if err := accessControl.Evaluate(); err != nil {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
return cb.Status_FORBIDDEN, nil
}
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
// 发送区块
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
}
h.Metrics.BlocksSent.With(labels...).Add(1)
// 如果到了client请求对最后区块跳出循环
if stopNum == block.Header.Number {
break
}
}
logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
return cb.Status_SUCCESS, nil
}
反序列化envelope.Payload
对payload.Header和ChannelHeader进行验证
根据通道获取对应对chain
访问控制相关验证,policy,signature
从payload.data解析出SeekInfo
新建一个迭代器cursor
通过stop.type判断stopNum()
cursor.Next()获取下个区块及SendBlockResponse()发送区块
判断是否达到请求的最后区块,是就跳过循环
// Chain encapsulates chain operations and data.
type Chain interface {
// Sequence returns the current config sequence number, can be used to detect config changes
Sequence() uint64
// PolicyManager returns the current policy manager as specified by the chain configuration
PolicyManager() policies.Manager
// Reader returns the chain Reader for the chain
Reader() blockledger.Reader
// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}
}
type SeekInfo struct {
Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
1.2 peer从orderer同步区块
这里主要解决1个问题:peer如何触发orderer deliver服务?即peer和orderer怎么同步区块的?
在介绍之前参阅peer节点启动流程。在peer节点启动过程中会执行peer.Initialize()方法,对peer所在的所有chain实例化。其中调用了createChain()接口创建链对象。在createChain()方法中调用了GossipService.InitializeChannel()方法。然后调用g.deliveryService[chainID].StartDeliverForChannel()方法获取区块。
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
// 创建区块deliver实例
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
// 执行
go d.launchBlockProvider(chainID, finalizer)
}
return nil
}
其中newClient()创建一个broadcastClient,传入参数为requester.RequestBlocks(ledgerInfoProvider)方法。很显然,peer是通过该方法获取区块的,那么该方法主要实现是什么?
func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error {
height, err := ledgerInfoProvider.LedgerHeight()
if err != nil {
logger.Errorf("Can't get ledger height for channel %s from committer [%s]", b.chainID, err)
return err
}
if height > 0 {
logger.Debugf("Starting deliver with block [%d] for channel %s", height, b.chainID)
if err := b.seekLatestFromCommitter(height); err != nil {
return err
}
} else {
logger.Debugf("Starting deliver with oldest block for channel %s", b.chainID)
if err := b.seekOldest(); err != nil {
return err
}
}
return nil
}
调用了seek_XXX方法,其中
type SeekInfo struct {
Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (b *blocksRequester) seekOldest() error {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}
//TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
msgVersion := int32(0)
epoch := uint64(0)
tlsCertHash := b.getTLSCertHash()
env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash)
if err != nil {
return err
}
// 发送envelope给orderer获取区块
return b.client.Send(env)
}
从其中RequestBlocks()调用的2个方法可知,seekInfo的stopNum都为math.MaxUint64,则该方法会持续请求区块知道最大值(可以看作现在到未来的所有区块)。
上文可知,broadcastClient已经实例化,并且通过调用broadcastClient.onConnect向orderer发送获取区块的envelope。在实例化后,调用launchBlockProvider。然后会调用 pb.DeliverBlocks()方法(开始获取区块)。
type broadcastClient struct {
stopFlag int32
stopChan chan struct{}
createClient clientFactory
shouldRetry retryPolicy
onConnect broadcastSetup
prod comm.ConnectionProducer
mutex sync.Mutex
blocksDeliverer blocksprovider.BlocksDeliverer
conn *connection
endpoint string
}
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
errorStatusCounter := 0
statusCounter := 0
defer b.client.Close()
for !b.isDone() {
// 接收orderer分发的区块
msg, err := b.client.Recv()
if err != nil {
logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error())
return
}
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
return
}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
logger.Errorf("[%s] Got error %v", b.chainID, t)
errorStatusCounter++
if errorStatusCounter > b.wrongStatusThreshold {
logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
return
}
} else {
errorStatusCounter = 0
logger.Warningf("[%s] Got error %v", b.chainID, t)
}
maxDelay := float64(maxRetryDelay)
currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
if currDelay < maxDelay {
statusCounter++
}
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)
} else {
b.client.Disconnect(true)
}
continue
case *orderer.DeliverResponse_Block:
errorStatusCounter = 0
statusCounter = 0
blockNum := t.Block.Header.Number
marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, blockNum, err)
continue
}
if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), blockNum, marshaledBlock); err != nil {
logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, blockNum, err)
continue
}
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
// Create payload with a block received
payload := createPayload(blockNum, marshaledBlock)
// Use payload to create gossip message
gossipMsg := createGossipMsg(b.chainID, payload)
logger.Debugf("[%s] Adding payload to local buffer, blockNum = [%d]", b.chainID,