又开始新的阅读了,这次看的是Peer节点加入通道的过程。其实每次看源码都会有好多没有看懂的地方,不过相信只要坚持下去,保持记录,还是有很多收获的。
对于Peer节点加入通道这一过程,从用户角度来说也只是简单执行一行命令:peer channel join -b mychannel.block
就完成了某一节点加入通道的过程。而从Fabric网络内部来讲,却是做了很多工作,接下来看一下具体的流程:
整个流程的切入点和客户端创建通道的流程相同在fabric/peer/main.go
文件中的main()
方法,通过执行以上命令调用到peer/channel/channel.go
中的Cmd()
方法,然后是peer/channel/join.go
文件中的joinCmd()
方法,131行的join()
,最后就到了88行的executeJoin()
方法,接下来就看一下该方法: spec, err := getJoinCCSpec()if err != nil { return err}
首先就是获取需要加入的通道的具体信息,在67行:
func getJoinCCSpec() (*pb.ChaincodeSpec, error) { #判断指定路径下是否有创世区块,创世区块的创建流程可以看之前那篇解析客户端创建通道的文章 if genesisBlockPath == common.UndefinedParamValue { return nil, errors.New("Must supply genesis block file") } #读取创世区块中的内容,就是通道的一些基本信息 gb, err := ioutil.ReadFile(genesisBlockPath) if err != nil { return nil, GBFileNotFoundErr(err.Error()) } #构造一个ChaincodeSpec结构体,第一个参数为JoinChain,指定操作为加入通道,第二个参数为创世区块的信息 input := &pb.ChaincodeInput{Args: [][]byte{[]byte(cscc.JoinChain), gb}} spec := &pb.ChaincodeSpec{ Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]), ChaincodeId: &pb.ChaincodeID{Name: "cscc"}, Input: input, }===================ChaincodeSpec=======================type ChaincodeSpec struct { Type ChaincodeSpec_Type `protobuf:"varint,1,opt,name=type,proto3,enum=protos.ChaincodeSpec_Type" json:"type,omitempty"` ChaincodeId *ChaincodeID `protobuf:"bytes,2,opt,name=chaincode_id,json=chaincodeId,proto3" json:"chaincode_id,omitempty"` Input *ChaincodeInput `protobuf:"bytes,3,opt,name=input,proto3" json:"input,omitempty"` Timeout int32 `protobuf:"varint,4,opt,name=timeout,proto3" json:"timeout,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"`}===================ChaincodeSpec======================= #最后返回该结构体 return spec, nil}
executeJoin()
方法继续往下看:
invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
根据之前创建的结构体再封装一个结构体ChaincodeInvocationSpec
:
type ChaincodeInvocationSpec struct { ChaincodeSpec *ChaincodeSpec `protobuf:"bytes,1,opt,name=chaincode_spec,json=chaincodeSpec,proto3" json:"chaincode_spec,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"`}
然后获取一个创建者的身份,用于之后的提案的创建与签名:
creator, err := cf.Signer.Serialize()if err != nil { return fmt.Errorf("Error serializing identity for %s: %s", cf.Signer.GetIdentifier(), err)}
接下来就是Proposal的创建了:
var prop *pb.Proposalprop, _, err = putils.CreateProposalFromCIS(pcommon.HeaderType_CONFIG, "", invocation, creator)if err != nil { return fmt.Errorf("Error creating proposal for join %s", err)}
具体还要看一下CreateProposalFromCIS()
方法,该方法在core/protos/proputils.go
文件第466行,继而调用了237行的CreateChaincodeProposal()
方法,看名字应该可以理解个大概信息,创建一个链码提案。然后是243行的CreateChaincodeProposalWithTransient()
方法:
func CreateChaincodeProposalWithTransient(typ common.HeaderType, chainID string, cis *peer.ChaincodeInvocationSpec, creator []byte, transientMap map[string][]byte) (*peer.Proposal, string, error) { #首先就是生成一个随机数 nonce, err := crypto.GetRandomNonce() if err != nil { return nil, "", err } #计算出一个TxID,具体是根据HASH算法生成的 txid, err := ComputeTxID(nonce, creator) if err != nil { return nil, "", err } #然后调用了这个方法,将之前生成的数据传入进去 return CreateChaincodeProposalWithTxIDNonceAndTransient(txid, typ, chainID, cis, nonce, creator, transientMap)}
CreateChaincodeProposalWithTxIDNonceAndTransient()
方法在第282行:
txid
由之前的方法生成,typ
最初的方法传入进来,值为HeaderType_CONFIG
,chainID为空字符串,cis
也是最初的方法传入进来,值为ChaincodeInvocationSpec
结构体,nonce
由之前的方法生成,creator
也是最初的方法传入进来,transientMap
为空,在之前的CreateChaincodeProposal()
方法中可以看到。然后我们看一下方法中的具体流程: func CreateChaincodeProposalWithTxIDNonceAndTransient(txid string, typ common.HeaderType, chainID string, cis *peer.ChaincodeInvocationSpec, nonce, creator []byte, transientMap map[string][]byte) (*peer.Proposal, string, error) { #首先是构造一个ChaincodeHeaderExtension结构体 ccHdrExt := &peer.ChaincodeHeaderExtension{ChaincodeId: cis.ChaincodeSpec.ChaincodeId}=========================ChaincodeHeaderExtension===================== type ChaincodeHeaderExtension struct { PayloadVisibility []byte `protobuf:"bytes,1,opt,name=payload_visibility,json=payloadVisibility,proto3" json:"payload_visibility,omitempty"` // The ID of the chaincode to target. ChaincodeId *ChaincodeID `protobuf:"bytes,2,opt,name=chaincode_id,json=chaincodeId,proto3" json:"chaincode_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }=========================ChaincodeHeaderExtension===================== #将该结构体序列化 ccHdrExtBytes, err := proto.Marshal(ccHdrExt) if err != nil { return nil, "", errors.Wrap(err, "error marshaling ChaincodeHeaderExtension") } #将ChaincodeInvocationSpec结构体序列化 cisBytes, err := proto.Marshal(cis) if err != nil { return nil, "", errors.Wrap(err, "error marshaling ChaincodeInvocationSpec") } #又是一个结构体 ccPropPayload := &peer.ChaincodeProposalPayload{Input: cisBytes, TransientMap: transientMap}============================ChaincodeProposalPayload=====================type ChaincodeProposalPayload struct { Input []byte `protobuf:"bytes,1,opt,name=input,proto3" json:"input,omitempty"` TransientMap map[string][]byte `protobuf:"bytes,2,rep,name=TransientMap,proto3" json:"TransientMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"`}============================ChaincodeProposalPayload===================== #将该结构体序列化 ccPropPayloadBytes, err := proto.Marshal(ccPropPayload) if err != nil { return nil, "", errors.Wrap(err, "error marshaling ChaincodeProposalPayload") } var epoch uint64 #创建一个时间戳 timestamp := util.CreateUtcTimestamp() #构造Header结构体,包含两部分ChannelHeader和SignatureHeader hdr := &common.Header{ ChannelHeader: MarshalOrPanic( &common.ChannelHeader{ Type: int32(typ), TxId: txid, Timestamp: timestamp, ChannelId: chainID, Extension: ccHdrExtBytes, Epoch: epoch, }, ), SignatureHeader: MarshalOrPanic( &common.SignatureHeader{ Nonce: nonce, Creator: creator, }, ), } #序列化 hdrBytes, err := proto.Marshal(hdr) if err != nil { return nil, "", err } #最后构造成一个Proposal prop := &peer.Proposal{ Header: hdrBytes, Payload: ccPropPayloadBytes, } #返回Proposal,这里一直返回到最外面的方法 return prop, txid, nil}
让我们回到executeJoin()
方法,继续往下看:
#刚刚这行代码返回了创建的Proposal prop, _, err = putils.CreateProposalFromCIS(pcommon.HeaderType_CONFIG, "", invocation, creator) if err != nil { return fmt.Errorf("Error creating proposal for join %s", err) } #定义一个被签名的Proposal var signedProp *pb.SignedProposal #这个方法就是对创建的Proposal进行签名了,具体的就不再看了,继续往下 signedProp, err = putils.GetSignedProposal(prop, cf.Signer) if err != nil { return fmt.Errorf("Error creating signed proposal %s", err) } #定义了个提案响应 var proposalResp *pb.ProposalResponse #重要的方法,由Peer节点对刚刚创建的提案进行处理,处理完成后返回提案响应,之前有篇文章对这个方法进行了讲解,在文章最后贴出了地址,这里就不再说明了 proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp) #后面的比较简单了,就是根据返回的响应消息进行处理,就不再说明了 if err != nil { return ProposalFailedErr(err.Error()) } if proposalResp == nil { return ProposalFailedErr("nil proposal response") } if proposalResp.Response.Status != 0 && proposalResp.Response.Status != 200 { return ProposalFailedErr(fmt.Sprintf("bad proposal response %d: %s", proposalResp.Response.Status, proposalResp.Response.Message)) } logger.Info("Successfully submitted proposal to join channel") return nil}
到这里Peer节点加入通道的操作就已经结束了,我们总结一下之前所做的工作:
- 首先就是由
peer channel join -b mychannel.block
这条命令触发,经过多次调用最后到executeJoin()
方法。 - 首先获取
mychannel.block
文件中的信息,封闭为ChaincodeSpec
结构体。 - 然后再封装为
ChaincodeInvocationSpec
结构体。 - 获取一个用于发起提案与对提案进行签名操作的
creator
。 - 生成nonce与TxID,进而封装为
ChaincodeHeaderExtension
,ChaincodeProposalPayload
,Header
,Proposal
结构体。 - 对生成的
Proposal
结构体进行签名操作,由Peer节点进行处理,处理完成后返回响应消息。
对于Peer节点进行消息处理的方法ProcessProposal
在这篇文章中:
这里给出一个类图好了,之前有太多的结构体,关系有点复杂:
该图片来源:最后给出