> batches;
它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:
再从源码角度来看如何添加到缓冲区队列里的,主要看这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#append:
注释写的十分详细了,这里需要思考一点,为什么分配内存的代码没有放在synchronized同步块里?看起来这里很多余,导致下面的synchronized同步块中还要tryAppend一下,因为这时候可能其他线程已经创建好RecordBatch了。造成多余的内存申请。但是仔细想想,如果把分配内存放在synchronized同步块会有什么问题?
内存申请不到线程会一直等待,如果放在同步块中会造成一直不释放Deque队列的锁,那其他线程将无法对Deque队列进行线程安全的同步操作。那不是走远了?
复制代码
1 /**
2 * Add a record to the accumulator, return the append result
3 *
4 * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
5 *
6 *
7 * @param tp The topic/partition to which this record is being sent
8 * @param timestamp The timestamp of the record
9 * @param key The key for the record
10 * @param value The value for the record
11 * @param headers the Headers for the record
12 * @param callback The user-supplied callback to execute when the request is complete
13 * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
14 */
15 public RecordAppendResult append(TopicPartition tp,
16 long timestamp,
17 byte[] key,
18 byte[] value,
19 Header[] headers,
20 Callback callback,
21 long maxTimeToBlock) throws InterruptedException {
22 // We keep track of the number of appending thread to make sure we do not miss batches in
23 // abortIncompleteBatches().
24 appendsInProgress.incrementAndGet();
25 ByteBuffer buffer = null;
26 if (headers == null) headers = Record.EMPTY_HEADERS;
27 try {
28 // check if we have an in-progress batch
29 // 其实就是一个putIfAbsent操作的方法,不展开分析
30 Deque dq = getOrCreateDeque(tp);
31 // batches是线程安全的,但是Deque不是线程安全的
32 // 已有在处理中的batch
33 synchronized (dq) {
34 if (closed)
35 throw new IllegalStateException("Cannot send after the producer is closed.");
36 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
37 if (appendResult != null)
38 return appendResult;
39 }
40
41 // we don't have an in-progress record batch try to allocate a new batch
42 // 创建一个新的ProducerBatch
43 byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
44 // 分配一个内存
45 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
46 log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
47 // 申请不到内存
48 buffer = free.allocate(size, maxTimeToBlock);
49 synchronized (dq) {
50 // Need to check if producer is closed again after grabbing the dequeue lock.
51 if (closed)
52 throw new IllegalStateException("Cannot send after the producer is closed.");
53
54 // 再次尝试添加,因为分配内存的那段代码并不在synchronized块中
55 // 有可能这时候其他线程已经创建好RecordBatch了,finally会把分配好的内存还回去
56 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
57 if (appendResult != null) {
58 // 作者自己都说了,希望不要总是发生,多个线程都去申请内存,到时候还不是要还回去?
59 // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
60 return appendResult;
61 }
62
63 // 创建ProducerBatch
64 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
65 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
66 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
67
68 dq.addLast(batch);
69 // incomplete是一个Set集合,存放不完整的batch
70 incomplete.add(batch);
71
72 // Don't deallocate this buffer in the finally block as it's being used in the record batch
73 buffer = null;
74
75 // 返回记录添加结果类
76 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
77 }
78 } finally {
79 // 释放要还的内存
80 if (buffer != null)
81 free.deallocate(buffer);
82 appendsInProgress.decrementAndGet();
83 }
84 }
复制代码
附加tryAppend()方法,不多说,都在代码注释里:
复制代码
1 /**
2 * Try to append to a ProducerBatch.
3 *
4 * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
5 * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
6 * and memory records built) in one of the following cases (whichever comes first): right before send,
7 * if it is expired, or when the producer is closed.
8 */
9 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque deque) {
10 // 获取最新加入的ProducerBatch
11 ProducerBatch last = deque.peekLast();
12 if (last != null) {
13 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
14 if (future == null)
15 last.closeForRecordAppends();
16 else
17 // 记录添加结果类包含future、batch是否已满的标记、是否是新batch创建的标记
18 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
19 }
20 // 如果这个Deque没有ProducerBatch元素,或者已经满了不足以加入本条消息则返回null
21 return null;
22 }
复制代码
以上代码见图解:
2.2 Sender
Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData
其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了
复制代码
1 private long sendProducerData(long now) {
2 // 获取当前集群的所有信息
3 Cluster cluster = metadata.fetch();
4 // get the list of partitions with data ready to send
5 // @return ReadyCheckResult类的三个变量解释
6 // 1.Set readyNodes 准备好发送的节点
7 // 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
8 // 3.Set unknownLeaderTopics 哪些topic找不到leader节点
9 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
10 // if there are any partitions whose leaders are not known yet, force metadata update
11 // 如果有些topic不知道leader信息,更新metadata
12 if (!result.unknownLeaderTopics.isEmpty()) {
13 // The set of topics with unknown leader contains topics with leader election pending as well as
14 // topics which may have expired. Add the topic again to metadata to ensure it is included
15 // and request metadata update, since there are messages to send to the topic.
16 for (String topic : result.unknownLeaderTopics)
17 this.metadata.add(topic);
18 this.metadata.requestUpdate();
19 }
20
21 // 去除不能发送信息的节点
22 // remove any nodes we aren't ready to send to
23 Iterator iter = result.readyNodes.iterator();
24 long notReadyTimeout = Long.MAX_VALUE;
25 while (iter.hasNext()) {
26 Node node = iter.next();
27 if (!this.client.ready(node, now)) {
28 iter.remove();
29 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
30 }
31 }
32
33 // 获取将要发送的消息
34 // create produce requests
35 Map> batches = this.accumulator.drain(cluster, result.readyNodes,
36 this.maxRequestSize, now);
37
38 // 保证发送消息的顺序
39 if (guaranteeMessageOrder) {
40 // Mute all the partitions drained
41 for (List batchList : batches.values()) {
42 for (ProducerBatch batch : batchList)
43 this.accumulator.mutePartition(batch.topicPartition);
44 }
45 }
46
47 // 过期的batch
48 List expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
49 boolean needsTransactionStateReset = false;
50 // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
51 // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
52 // we need to reset the producer id here.
53 if (!expiredBatches.isEmpty())
54 log.trace("Expired {} batches in accumulator", expiredBatches.size());
55 for (ProducerBatch expiredBatch : expiredBatches) {
56 failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
57 if (transactionManager != null && expiredBatch.inRetry()) {
58 needsTransactionStateReset = true;
59 }
60 this.sensors.recordErrors(expiredBatch.topicParti