关于高并发下kafka producer send异步发送耗时问题的分析

 最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍。所以就对kafka API send 的源码进行了一下跟踪和分析,在此总结记录一下。

首先看springboot下 kafka producer 的使用

在config中进行配置,向IOC容器中注入DefaultKafkaProducerFactory生产者工厂的实例

复制代码
    @Bean     public ProducerFactory<Object, Object> producerFactory() {         return new DefaultKafkaProducerFactory<>(producerConfigs());     }
复制代码

创建producer

this.producer = producerFactory.createProducer();

大家都知道springboot下IOC容器管理的实例默认都是单例模式;而DefaultKafkaProducerFactory本身也是一个单例工厂

复制代码
    @Override     public Producer<K, V> createProducer() {         if (this.transactionIdPrefix != null) {             return createTransactionalProducer();         }         if (this.producer == null) {             synchronized (this) {                 if (this.producer == null) {                     this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());                 }             }         }         return this.producer;     }
复制代码

我们创建的producer也是个单例。

接下来就是具体的发送,用过kafka的小伙伴都知道producer.send是个异步操作,会返回一个Future<RecordMetadata> 类型的结果。那么为什么单线程和多线程send效率会较大的差距呢,我们进入KafkaProducer内部看下producer.send的具体源码实现来找下答案

复制代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {         TopicPartition tp = null;         try {             //保证主题的元数据可用            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);             long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);             Cluster cluster = clusterAndWaitTime.cluster;             byte[] serializedKey;             try {                 //序列化key                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());             } catch (ClassCastException cce) {                 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +                        " specified in key.serializer", cce);             }             byte[] serializedValue;             try {                 //序列化Value                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());             } catch (ClassCastException cce) {                 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +                        " specified in value.serializer", cce);             }             //计算出具体的partition             int partition = partition(record, serializedKey, serializedValue, cluster);             tp = new TopicPartition(record.topic(), partition);              setReadOnly(record.headers());             Header[] headers = record.headers().toArray();              int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),                     compressionType, serializedKey, serializedValue, headers);             
                    
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信