Fabric 1.4 源码分析 背书节点和链码容器交互 本文档主要介绍背书节点和链码容器交互流程,在Endorser背书节点章节中,无论是deploy、upgrade或者调用链码,最后都会调用ChaincodeSupport.LaunchInit()/Launch()以及ChaincodeSupport.execute()方法。其中Launch()方法启动链码容器,execute()方法调用链码。 1. 准备 ChaincodeSupport.Launch()首先进行判断,根据peer侧该版本链码的Handler是否存在,存在则表示已运行。若不存在,则调用lscc链码方法cs.Lifecycle.ChaincodeContainerInfo()获取启动链码所需的数据ChaincodeContainerInfo。再调用cs.Launcher.Launch()方法启动链码。再判断是否注册了handler。 func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) { cname := chaincodeName + ":" + chaincodeVersion if h := cs.HandlerRegistry.Handler(cname); h != nil { return h, nil } ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe) if err != nil { // TODO: There has to be a better way to do this... if cs.UserRunsCC { chaincodeLogger.Error( "You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?", ) } return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname) } if err := cs.Launcher.Launch(ccci); err != nil { return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname) } h := cs.HandlerRegistry.Handler(cname) if h == nil { return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname) } return h, nil } type ChaincodeContainerInfo struct { Name string Version string Path string Type string CodePackage []byte // ContainerType is not a great name, but 'DOCKER' and 'SYSTEM' are the valid types ContainerType string } Launch()主要实现方法在core/chaincode/runtime_launcher.go Launch()方法。在该方法中,会调用r.Runtime.Start(ccci, codePackage)启动链码,在该方法中,首先会调用c.LaunchConfig(cname, ccci.Type)生成创建链码所需的参数LaunchConfig(链码类型go/java/nodejs,以及TLS配置),然后构造启动链码容器请求StartContainerReq。接着调用c.Processor.Process(ccci.ContainerType, scr)正式启动链码容器。操作完成后,通过Launch()里面的select—case语句阻塞获取结果,并结束程序运行。 func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error { ... if !alreadyStarted { ... go func() { if err := r.Runtime.Start(ccci, codePackage); err != nil { startFailCh <- errors.WithMessage(err, "error starting container") return } exitCode, err := r.Runtime.Wait(ccci) if err != nil { launchState.Notify(errors.Wrap(err, "failed to wait on container exit")) } launchState.Notify(errors.Errorf("container exited with %d", exitCode)) }() } var err error select { case <-launchState.Done(): err = errors.WithMessage(launchState.Err(), "chaincode registration failed") case err = <-startFailCh: launchState.Notify(err) r.Metrics.LaunchFailures.With("chaincode", cname).Add(1) case <-timeoutCh: err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname) launchState.Notify(err) r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1) } ... return err } 经上面可知,在启动链码容器时会调用c.Processor.Process()方法,其中会调用req.Do(v)。存在3个实现,分别是StartContainerReq、WaitContainerReq、StopContainerReq。启动时是调用StartContainerReq。 func (si StartContainerReq) Do(v VM) error { return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder) } 2. 启动系统链码 启动系统链码(进程模式)的话,则v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)的实现是在core/container/inproccontroller/inproccontroller.go start()方法。 func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error { path := ccid.GetName() // name=Name-Version // 获取已注册的inprocContainer模版 ipctemplate := vm.registry.getType(path) ... instName := vm.GetVMName(ccid) // 构建chaincode实例ipc ipc, err := vm.getInstance(ipctemplate, instName, args, env) // 判断链码是否运行 if ipc.running { return fmt.Errorf(fmt.Sprintf("chaincode running %s", path)) } ipc.running = true go func() { defer func() { if r := recover(); r != nil { inprocLogger.Criticalf("caught panic from chaincode %s", instName) } }() // 进程模式运行链码 ipc.launchInProc(instName, args, env) }() return nil } 在start()方法方法中,首先会获取ccid的name,然后根据name获取已注册的系统链码模版ipctemplate,根据模版及args、env等参数构建系统链码实例ipc,然后再判断是否运行了系统链码,如果没有运行,则开启协程调用launchInProc()方法进程模式启动系统链码。 在launchInProc()中开启了2个协程,协程一主要执行shimStartInProc()方法,协程二主要执行HandleChaincodeStream()方法。并且新建了2个通道,便于peer侧和链码侧通信。 func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error { if ipc.ChaincodeSupport == nil { inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()") } // 建立peer侧接收链码侧发送通道 peerRcvCCSend := make(chan *pb.ChaincodeMessage) // 建立链码侧接收peer侧发送通道 ccRcvPeerSend := make(chan *pb.ChaincodeMessage) var err error // 传递链码侧Handler对象运行状态的通道 ccchan := make(chan struct{}, 1) // 传递peer侧Handler对象运行状态的通道 ccsupportchan := make(chan struct{}, 1) shimStartInProc := _shimStartInProc // shadow to avoid race in test go func() { defer close(ccchan) inprocLogger.Debugf("chaincode started for %s", id) if args == nil { args = ipc.args } if env == nil { env = ipc.env } // 启动系统链码 err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend) if err != nil { err = fmt.Errorf("chaincode-support ended with err: %s", err) _inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err) }() // shadow function to avoid data race inprocLoggerErrorf := _inprocLoggerErrorf go func() { defer close(ccsupportchan) inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend) inprocLogger.Debugf("chaincode-support started for %s", id) // 启动peer侧Handler处理句柄,创建消息循环,处理链码侧发送的消息 err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream) if err != nil { err = fmt.Errorf("chaincode ended with err: %s", err) inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err) }() // 阻塞等待消息处理 select { // 链码侧退出,关闭peer侧接收链码侧发送通道 case <-ccchan: close(peerRcvCCSend) inprocLogger.Debugf("chaincode %s quit", id) // peer侧chaincode support退出 case <-ccsupportchan: close(ccRcvPeerSend) inprocLogger.Debugf("chaincode support %s quit", id) case <-ipc.stopChan: close(ccRcvPeerSend) close(peerRcvCCSend) inprocLogger.Debugf("chaincode %s stopped", id) } return err } 链码侧: shimStartInProc()方法本质上是执行StartInProc()方法,首先遍历环境变量,获取CORE_CHAINCODE_ID_NAME,在执行newInProcStream()创建通信流,本质上只是将链码侧和peer侧发送接收的两个通道绑定。再执行chatWithPeer()方法与peer侧交互。chatWithPeer()首先调用newChaincodeHandler()创建链码侧Handler,然后发送第一个注册消息,然后开启消息循环进行处理。 // Register on the stream chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER) if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil { return errors.WithMessage(err, "error sending chaincode REGISTER") } peer侧: 该协程中,首先newInProcStream()创建通信流此处和链码侧刚刚相反。再调用HandleChaincodeStream()方法,首先创建peer侧Handle,再调用handler.ProcessStream(stream)对通信流进行处理(里面也有个循环)。 具体交互流程后续介绍。 3. 启动应用链码 当启动应用链码(docker容器模式)时,Start()接口实现为core/container/dockercontroller/dockercontroller.go Start()方法。 在Start()方法中,首先调用GetVMNameForDocker方法生成镜像名networkId-peerid-name-version-Hash(networkId-peerid-name-version),在调用GetVMName()方法生成容器名(networkId-peerid-name-version)。在调用getClientFnc()获取docker客户端,判断当前是否运行链码容器,运行则停止当前运行的容器。接着调用createContainer()创建容器,如果报不存在镜像,则构建镜像,再创建链码容器。如果需要配置TLS,则调用UploadToContainer()方法提交TLS证书文件。再调用StartContainer()正式启动链码容器。 当链码容器启动后,会执行shim.start()方法。首先会获取通信流与peer侧通信。再调用chatWithPeer()方法。此处介绍获取通信流方法。 func userChaincodeStreamGetter(name string) (PeerChaincodeStream, error) { flag.StringVar(&peerAddress, "peer.address", "", "peer address") ... // Establish connection with validating peer // 与peer建立连接 clientConn, err := newPeerClientConnection() ... // 创建链码支持服务客户端 chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn) ... // Establish stream with validating peer // 调用Register()接口获取通信流 stream, err := chaincodeSupportClient.Register(context.Background()) return stream, nil } 当执行chaincodeSupportClient.Register()方法时peer侧会执行HandleChaincodeStream()方法。 func (cs *ChaincodeSupport) Register(stream pb.ChaincodeSupport_RegisterServer) error { return cs.HandleChaincodeStream(stream) } 4. 背书节点和链码交互 4.1 准备 在构建系统链码和应用链码流程中,peer侧执行HandleChaincodeStream()方法,链码侧执行chatWithPeer()方法,并通过通信流来进行交互。其中,两个方法中对消息处理的方法为handleMessage() 链码侧 switch handler.state { case ready: err = handler.handleReady(msg, errc) case established: err = handler.handleEstablished(msg, errc) case created: err = handler.handleCreated(msg, errc) default: err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state) } peer侧 switch h.state { case Created: return h.handleMessageCreatedState(msg) case Ready: return h.handleMessageReadyState(msg) default: return errors.Errorf("handle message: invalid state %s for transaction %s", h.state, msg.Txid) } 接下来按照消息流程介绍 链码侧发送REGISTER消息 首先进行各项基本配置,然后建立起与Peer节点的gRPC连接。 创建Handler,并更改Handler状态为“Created”。 发送REGISTER消息到peer节点。 等待peer节点返回的信息 peer侧接收REGISTER消息 此时peer侧Handler状态为“Created”,调用handleMessageCreatedState()里面的HandleRegister()方法。 peer侧注册Handler,并发送REGISTERED消息给链码侧 更新peer侧Handler状态为“Established” 并且会调用notifyRegistry()方法,发送READY消息给链码侧,并更新状态为“Ready” 链码侧接收消息 当链码侧接收REGISTERED消息,更新状态为Handler状态为“Established” 当链码侧接收READY消息,更新状态为Handler状态为“Ready” 至此,链码容器与peer节点已完成连接准备操作。 4.2 执行链码 主要实现是Execute()方法。在背书节点介绍中,存在两种消息类型:ChaincodeMessage_TRANSACTION/ChaincodeMessage_INIT。分别对应调用链码和实例化链码/升级链码操作。此时链码侧和peer侧Handler都处于Ready状态。在该交互流程中,本质上是peer侧发送消息给链码侧通过调用链码的Init()/Invoke()方法完成,然后将消息返回给链码侧。 4.2.1 实例化链码/升级链码操作 则peer侧发送的消息类型为ChaincodeMessage_INIT。在ChaincodeSupport.execute()中会调用handler.execute()方法。 func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) { txParams.CollectionStore = h.getCollectionStore(msg.ChannelId) txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT) // 创建交易上下文 txctx, err := h.TXContexts.Create(txParams) if err != nil { return nil, err } // 删除交易上下文 defer h.TXContexts.Delete(msg.ChannelId, msg.Txid) if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil { return nil, err } // 异步发送消息 h.serialSendAsync(msg) var ccresp *pb.ChaincodeMessage // 等待链码侧响应 select { case ccresp = <-txctx.ResponseNotifier: // response is sent to user or calling chaincode. ChaincodeMessage_ERROR // are typically treated as error case <-time.After(timeout): err = errors.New("timeout expired while executing transaction") ccName := cccid.Name + ":" + cccid.Version h.Metrics.ExecuteTimeouts.With( "chaincode", ccName, ).Add(1) } return ccresp, err } 当链码侧接收到ChaincodeMessage_INIT类型消息时会调用handler.handleInit(msg, errc)方法。 case pb.ChaincodeMessage_INIT: chaincodeLogger.Debugf("[%s] Received %s, initializing chaincode", shorttxid(msg.Txid), msg.Type) // Call the chaincode's Run function to initialize handler.handleInit(msg, errc) return nil // handleInit handles request to initialize chaincode. func (handler *Handler) handleInit(msg *pb.ChaincodeMessage, errc chan error) { go func() { var nextStateMsg *pb.ChaincodeMessage defer func() { // 协程结束时执行 handler.triggerNextState(nextStateMsg, errc) }() ... // Get the function and args from Payload // 获取方法和参数 input := &pb.ChaincodeInput{} unmarshalErr := proto.Unmarshal(msg.Payload, input) // Call chaincode's Run // Create the ChaincodeStub which the chaincode can use to callback stub := new(ChaincodeStub) err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal) // 执行链码的Init方法 res := handler.cc.Init(stub) // Send COMPLETED message to chaincode support and change state nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId} chaincodeLogger.Debugf("[%s] Init succeeded. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED) }() } 在handleInit(msg, errc)方法中,会反序列化msg.Payload为链码的输入,其中包含Args。然后调用链码的Init()方法,执行链码初始化流程。并将返回结果、链码事件、交易id以及通道id封装成ChaincodeMessage_COMPLETED类型的ChaincodeMessage发送给peer侧(triggerNextState()方法调