【RocketMQ源码学习】- 5. 消息存储机制
前言
面试官:你了解RocketMQ是如何存储消息的吗?
我:额,,,你等下,我看下这篇文字, (逃
由于这部分内容优点多,所以请哥哥姐姐们自备茶水,欢迎留言!
RocketMQ存储设计是高可用和高性能的保证, 利用磁盘存储来满足海量堆积能力。Kafka单机在topic数量在100+的时候,性能会下降很多,而RocketMQ能够在多个topic存在时,依然保持高性能
下面主要从存储结构、存储流程、存储优化的技术来形成文字
基于的版本是RocketMQ4.5.2
存储架构图
要发送的消息,会按顺序写入commitlog中,这里所有topic和queue共享一个文件
存入commitlog后,由于消息会按照topic纬度来消费,会异步构建consumeQueue(逻辑队列)和index(索引文件),consumeQueue存储消息的commitlogOffset/messageSize/tagHashCode, 方便定位commitlog中的消息实体。每个 Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
消费者会从consumeQueue取到msgOffset,方便快速取出消息
好处
CommitLog 顺序写 ,可以大大提高写人效率,提高堆积能力
虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度
在实际情况中,大部分的 ConsumeQueue能够被全部读人内存,所以这个中间结构的操作速度很快, 可以认为是内存读取的速度
消息文件存储的结构设计
存储的文件主要分为:
commitlog: 存储消息实体
consumequeue: 按Topic和队列存储消息的offset
index: index按key、tag、时间等存储
commitlog(物理队列)
文件地址:${user.home} \store${commitlog}${fileName}
commitlog特点:
存放该broke所有topic的消息
默认1G大小
以偏移量为文件名,当一个文件写满时则创建新文件,这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件
一个消息存储单元是不定长的
顺序写但是随机读
消息单元的存储结构
下面的表格说明了,每个消息体不是定长的,会存储消息的哪些内容,包括物理偏移量、consumeQueue的偏移量、消息体等信息
顺序 字段名 说明
1 totalSize(4Byte) 消息大小
2 magicCode(4) 设置为daa320a7 (这个不太明白)
3 bodyCRC(4) 当broker重启recover时会校验
4 queueId(4) 消息对应的consumeQueueId
5 flag(4) rocketmq不做处理,只存储后透传
6 queueOffset(8) 消息在consumeQueue中的偏移量
7 physicalOffset(8) 消息在commitlog中的偏移量
8 sysFlg(4) 事务标示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
9 bronTimestamp(8) 消息产生端(producer)的时间戳
10 bronHost(8) 消息产生端(producer)地址(address:port)
11 storeTimestamp(8) 消息在broker存储时间
12 storeHostAddress(8) 消息存储到broker的地址(address:port)
13 reconsumeTimes(4) 消息重试次数
14 preparedTransactionOffset(8) 事务消息的物理偏移量
15 bodyLength(4) 消息长度,最长不超过4MB
16 body(body length Bytes) 消息体内容
17 topicLength(1) 主题长度,最长不超过255Byte
18 topic(topic length Bytes) 主题内容
19 propertiesLength(2) 消息属性长度,最长不超过65535Bytes
20 properties(properties length Bytes) 消息属性内容
consumequeue文件(逻辑队列)
文件地址:${user.home}\store\consumeQueue${topic}${queueId}${fileName}
consumequeue文件特点:
按topic和queueId纬度分别存储消息commitLogOffset、size、tagHashCode
以偏移量为文件名
一个存储单元是20个字节的定长的
顺序读顺序写
每个ConsumeQueue文件大小约5.72M
每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件
该结构对应于消费者逻辑队列,为什么要将一个topic抽象出很多的queue呢?这样的话,对集群模式更有好处,可以使多个消费者共同消费,而不用上锁;
消息单元的存储结构
顺序 字段名 说明
1 offset(8) commitlog的偏移量
2 size(4) commitlog消息大小
3 tagHashCode tag的哈希值
index索引文件
文件地址:${user.home}\store\index${fileName}
index文件特点:
以时间作为文件名
一个存储单元是20个字节定长的
索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
存储单元的结构
顺序 字段名 说明
1 keyHash(4) key的结构是
2 phyOffset(8) commitLog真实的物理位移
3 timeOffset(4) 时间偏移量
4 slotValue(4) 下一个记录的slot值
消息存储流程
RocketMQ文件存储模型层次结构
层次从上到下依次为:
业务层
QueueMessageProcessor类
PullMessageProcessor类
SendMessageProcessor类
DefaultMessageStore类
存储逻辑层
IndexService类
ConsumeQueue类
CommitLog类
IndexFile类
MappedFileQueue类
磁盘交互IO层
MappedFile类
MappedByteBuffer类
业务层 QueueMessageProcessor PullMessageProcessor
SendMessageProcessor
DefaultMessageStore
存储逻辑层 IndexService ConsumeQueue CommitLog
IndexFile MappedFileQueue
磁盘交互IO层 MappedFile
MappedByteBuffer
Disk
写commoitlog流程
1. DefaultMessageStore,入口方法是putMessage方法
RocketMQ 的存储核心类为 DefaultMessageStore,入口方法是putMessage方法
复制代码
1 // DefaultMessageStore#putMessage
2 public PutMessageResult putMessage(MessageExtBrokerInner msg) {
3 // 判断该服务是否shutdown,不可用直接返回【代码省略】
4 // 判断broke的角色,如果是从节点直接返回【代码省略】
5 // 判断runningFlags是否是可写状态,不可写直接返回,可写把printTimes设为0【代码省略】
6 // 判断topic名字是否大于byte字节127, 大于则直接返回【代码省略】
7 // 判断msg中properties属性长度是否大于short最大长度32767,大于则直接返回【代码省略】
8
9 if (this.isOSPageCacheBusy()) { // 判断操作系统页写入是否繁忙
10 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
11 }
12
13 long beginTime = this.getSystemClock().now();
14 PutMessageResult result = this.commitLog.putMessage(msg); // $2 查看下方代码,写msg核心
15
16 long elapsedTime = this.getSystemClock().now() - beginTime;
17 if (elapsedTime > 500) {
18 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
19 }
20 // 记录写commitlog时间,大于最大时间则设置为这个最新的时间
21 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
22
23 if (null == result || !result.isOk()) {
24 // 记录写commitlog 失败次数
25 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
26 }
27
28 return result;
29 }
复制代码
$2 CommitLog#putMessage 将日志写入CommitLog 文件
复制代码
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // $1
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // $3
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) { // $5
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6
switch (result.getStatus()) { // $7
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg); // $8
handleHA(result, putMessageResult, msg); // $9
return putMessageResult;
}
复制代码
$1 获取消息的事务类型
$2 对于事务消息中UNKNOW、COMMIT消息,处理topic和queueId, 同时备份real_topic,real_queueId
$3 获取最新的mappedFile文件,有可能为空
$4 给写mappedFile加锁(默认自旋锁)
$5 mappedFile为空时创建mappedFile文件, 创建的mappedFile文件offset为0
$6 在mappedFile中append消息,下面具体说明
$7 根据mappedFile写消息的结果
ok, 直接break
文件剩下的空间不够写了,重新创建一个mappedFile文件, 重新写消息
msg大小,properties大小,未知错误,返回错误类型
$8 执行刷盘
$9 执行主从同步
3. $6 在mappedFile中append消息
mappedFile.appendMessage方法会调用this.appendMessagesInner方法
复制代码
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get(); // $1
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // $2
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) { // $3
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); // $4
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes()); // $5
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
复制代码
$1 获取当前写入位置
$2 创建写缓存,放入文件的写入位置
$3 判断是单条消息还是批量消息
$4 同步写消息, fileSize-currentPos即为该文件还剩下的空白大小
$5 写完消息,累加文件当前位置
4. $4 同步写消息
代码在CommitLog内部类 DefaultAppendMessageCallback中
复制代码
// CommitLog$DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position(); // $1 this.resetByteBuffer(hostHolder, 8); // $2 String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3 if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // $4 queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } // Serialize message // $5 final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int
long wroteOffset = fileFromOffset + byteBuffer.position(); // $1 this.resetByteBuffer(hostHolder, 8); // $2 String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3 if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // $4 queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } // Serialize message // $5 final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int