剖析nsq消息队列(二) 去中心化源码解析
nsqd
启动后连接nsqlookupd
,连接成功后,要发送一个魔法标识nsq.MagicV1
,这个标识有啥魔法么,当然不是,他只是用于标明,客户端和服务端双方使用的信息通信版本,不能的版本有不同的处理方式,为了后期做新的消息处理版本方便吧。nsqlookupd
的代码块
func (p *tcpServer) Handle(clientConn net.Conn) { // ... buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) // ... protocolMagic := string(buf) // ... var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} default: // ... return } err = prot.IOLoop(clientConn) //... }
这个时候的nsqd
已经和nsqlookupd
建立好了连接,但是这时,仅仅说明他俩连接成功。nsqlookupd
也并没有把这个连接加到可用的nsqd
列表里。
建立连接完成后,nsqd
会发送IDENTIFY
命令,这个命令里包含了nsq的基本信息nsqd
的代码
ci := make(map[string]interface{}) ci["version"] = version.Binary ci["tcp_port"] = n.RealTCPAddr().Port ci["http_port"] = n.RealHTTPAddr().Port ci["hostname"] = hostname ci["broadcast_address"] = n.getOpts().BroadcastAddress cmd, err := nsq.Identify(ci) if err != nil { lp.Close() return } resp, err := lp.Command(cmd)
包含了nsqd
提供的tcp
和http
端口,主机名,版本等等,发送给nsqlookupd
,nsqlookupd
收到IDENTIFY
命令后,解析信息然后加到nsqd
的可用列表里nsqlookupd
的代码块
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string<