剖析nsq消息队列(四) 消息的负载处理
如果不考虑负载情况,把随机的把消息发送到某一个客服端去处理消息,如果机器的性能不同,可能发生的情况就是某一个或几个客户端处理速度慢,但还有大量新的消息需要处理,其他的客户端处于空闲状态。理想的状态是,找到当前相对空闲的客户端去处理消息。
nsq
的处理方式是客户端主动向nsqd
报告自已的可处理消息数量(也就是RDY
命令)。nsqd
根据每个连接的客户端的可处理消息的状态来随机把消息发送到可用的客户端,来进行消息处理
如下图所示:
客户端更新RDY
从第一篇帖子的例子中我们就有配置consumer的config
config := nsq.NewConfig() config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second
MaxInFlight
来设置最大的处理中的消息数量,会根据这个变量计算在是否更新RDY
初始化的时候 客户端会向连接的nsqd服务端来发送updateRDY来设置最大处理数,
func (r *Consumer) maybeUpdateRDY(conn *Conn) { inBackoff := r.inBackoff() inBackoffTimeout := r.inBackoffTimeout() if inBackoff || inBackoffTimeout { r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v", conn, inBackoff, inBackoffTimeout) return } remain := conn.RDY() lastRdyCount := conn.LastRDY() count := r.perConnMaxInFlight() // refill when at 1, or at 25%, or if connections have changed and we're imbalanced if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)", conn, count, remain, lastRdyCount) r.updateRDY(conn, count) } else { r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)", conn, count, remain, lastRdyCount) } }
当剩余的可用处理数量remain
小于等于1,或者小于最后一次设置的可用数量lastRdyCount
的1/4时,或者可用连接平均的maxInFlight大于0并且小于remain
时,则更新RDY
状态
当有多个nsqd
时,会把最大的消息进行平均计算:
// perConnMaxInFlight calculates the per-connection max-in-flight count. // // This may change dynamically based on the number of connections to nsqd the Consumer // is responsible for. func (r *Consumer) perConnMaxInFlight() int64 { b := float64(r.getMaxInFlight()) s := b / float64(len(r.conns())) return int64(math.Min(math.Max(1, s), b)) }
当有消息从nsqd
发送过来后也会调用maybeUpdateRDY
方法,计算是否需要发送RDY
命令