RocketMQ 消息索引流程

讲述 RocketMQ 消息索引服务

一、消息查询方式

对于 Producer 发送到 Broker 服务器的消息,RocketMQ 支持多种方式来方便地查询消息:

  • 根据查询消息

如下所示,在构建消息的时候,指定了这条消息的为 “OrderID001”:

1
2
3
4
5
Message msg =
new Message("TopicTest",
"TagA",
"OrderID001", // Keys
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

那么,当这条消息发送成功后,我们可以使用 queryMsgByKey 命令查询到这条消息的详细信息:

1
2
3
4
5
6
7
8
9
MQAdminStartup.main(new String[] {
"queryMsgByKey",
"-n",
"localhost:9876",
"-t",
"TopicTest",
"-k",
"OrderID001"
});
  • 根据(偏移量) ID查询消息

消息在发送成功之后,其返回的 SendResult 类中包含了这条消息的唯一偏移量 ID (注意此处指的是 offsetMsgId):

sendresult_offset_msg_id

用户可以使用 queryMsgById 命令查询这条消息的详细信息:

1
2
3
4
5
6
7
MQAdminStartup.main(new String[] {
"queryMsgById",
"-n",
"localhost:9876",
"-i",
"0A6C73D900002A9F0000000000004010"
});
  • 根据唯一键查询消息

消息在发送成功之后,其返回的 SendResult 类中包含了这条消息的唯一 ID:

sendresult_msg_id

用户可以使用 queryMsgByUniqueKey 命令查询这条消息的详细信息:

1
2
3
4
5
6
7
8
9
MQAdminStartup.main(new String[] {
"queryMsgByUniqueKey",
"-n",
"localhost:9876",
"-i",
"0A6C73D939B318B4AAC20CBA5D920000",
"-t",
"TopicTest"
});
  • 根据消息队列偏移量查询消息

消息发送成功之后的 SendResult 中还包含了消息队列的其它信息,如消息队列 ID、消息队列偏移量等信息:

1
2
3
4
5
6
7
SendResult [sendStatus=SEND_OK,
msgId=0A6C73D93EC518B4AAC20CC4ACD90000,
offsetMsgId=0A6C73D900002A9F000000000000484E,
messageQueue=MessageQueue [topic=TopicTest,
brokerName=zk-pc,
queueId=3],
queueOffset=24]

根据这些信息,使用 queryMsgByOffset 命令也可以查询到这条消息的详细信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
MQAdminStartup.main(new String[] {
"queryMsgByOffset",
"-n",
"localhost:9876",
"-t",
"TopicTest",
"-b",
"zk-pc",
"-i",
"3",
"-o",
"24"
});

二、(偏移量) ID 查询

二 (1)、生成 ID

(偏移量) ID 是在消息发送到 Broker 服务器存储的时候生成的,其包含如下几个字段:

  • Broker 服务器 IP 地址
  • Broker 服务器端口号
  • 消息文件 CommitLog 写偏移量

msgid_generated_in_broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CommitLog {

class DefaultAppendMessageCallback implements AppendMessageCallback {

public AppendMessageResult doAppend(final long fileFromOffset, /** 其它参数 **/) {
String msgId = MessageDecoder
.createMessageId(this.msgIdMemory,
msgInner.getStoreHostBytes(hostHolder),
wroteOffset);
// ...
}

}

}

二 (2)、使用 ID 查询

Admin 端查询的时候,首先对 msgId 进行解析,取出 Broker 服务器的 IP 、端口号和消息偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
public class MessageDecoder {

public static MessageId decodeMessageId(final String msgId)
throws UnknownHostException {
byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
// offset
byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
// ...
}

}

获取到偏移量之后,Admin 会对 Broker 服务器发送一个 VIEW_MESSAGE_BY_ID 的请求命令,Broker 服务器在收到请求后,会依据偏移量定位到 CommitLog 文件中的相应位置,然后取出消息,返回给 Admin 端:

1
2
3
4
5
6
7
8
9
10
11
12
public class DefaultMessageStore implements MessageStore {

@Override
public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
SelectMappedBufferResult sbr = this.commitLog
.getMessage(commitLogOffset, 4);
// 1 TOTALSIZE
int size = sbr.getByteBuffer().getInt();
return this.commitLog.getMessage(commitLogOffset, size);
}

}

query_message_by_msg_offset_id

三、消息队列偏移量查询

根据队列偏移量查询是最简单的一种查询方式,Admin 会启动一个 PullConsumer ,然后利用用户传递给 Admin 的队列 ID、队列偏移量等信息,从服务器拉取一条消息过来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class QueryMsgByOffsetSubCommand implements SubCommand {

@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
// 根据参数构建 MessageQueue
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
mq.setBrokerName(brokerName);
mq.setQueueId(Integer.parseInt(queueId));

// 从 Broker 服务器拉取消息
PullResult pullResult = defaultMQPullConsumer.pull(mq, "*", Long.parseLong(offset), 1);
}

}

query_msg_by_message_queue_offset

四、消息索引服务

在继续讲解剩下两种查询方式之前,我们必须先介绍以下 Broker 端的消息索引服务

在之前提到过,每当一条消息发送过来之后,其会封装为一个 DispatchRequest 来下发给各个转发服务,而 CommitLogDispatcherBuildIndex 构建索引服务便是其中之一:

1
2
3
4
5
6
7
8
9
10
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}

}

四 (1)、索引文件结构

消息的索引信息是存放在磁盘上的,文件以时间戳命名的,默认存放在 $HOME/store/index 目录下。由下图来看,一个索引文件的结构被分成了三部分:

  • 前 40 个字节存放固定的索引头信息,包含了存放在这个索引文件中的消息的最小/大存储时间、最小/大偏移量等状况
  • 中间一段存储了 500 万个哈希槽位,每个槽内部存储的是索引文件的地址 (索引槽)
  • 最后一段存储了 2000 万个索引内容信息,是实际的索引信息存储的地方。每一个槽位存储了这条消息的键哈希值、存储偏移量、存储时间戳与下一个索引槽地址

indexfile_structure

RocketMQ 在内存中还维护了一个索引文件列表,对于每一个索引文件,前一个文件的最大存储时间是下一个文件的最小存储时间,前一个文件的最大偏移量是下一个文件的最大偏移量。每一个索引文件都索引了在某个时间段内、某个偏移量段内的所有消息,当文件满了,就会用前一个文件的最大偏移量和最大存储时间作为起始值,创建下一个索引文件:

indexfile_list

四 (2)、添加消息

当有新的消息过来后,构建索引服务会取出这条消息的,然后对字符串 “话题#键” 构建索引。构建索引的步骤如下:

  • 找出哈希槽:生成字符串哈希码,取余落到 500W 个槽位之一,并取出其中的值,默认为 0
  • 找出索引槽:IndexHeader 维护了 indexCount,实际存储的索引槽就是直接依次顺延添加的
  • 存储索引内容:找到索引槽后,放入键哈希值、存储偏移量、存储时间戳与下一个索引槽地址。下一个索引槽地址就是第一步哈希槽中取出的值,0 代表这个槽位是第一次被索引,而不为 0 代表这个槽位之前的索引槽地址。由此,通过索引槽地址可以将相同哈希槽的消息串联起来,像单链表那样。
  • 更新哈希槽:更新原有哈希槽中存储的值

我们以实际例子来说明。假设我们需要依次为键的哈希值为 “{16,29,29,8,16,16}” 这几条消息构建索引,我们在这个地方忽略了索引信息中存储的存储时间和偏移量字段,只是存储键哈希和下一索引槽信息,那么:

  • 放入 16:将 “16|0” 存储在第 1 个索引槽中,并更新哈希槽为 16 的值为 1,即哈希槽为 16 的第一个索引块的地址为 1
  • 放入 29:将 “29|0” 存储在第 2 个索引槽中,并更新哈希槽为 29 的值为 2,即哈希槽为 29 的第一个索引块的地址为 2
  • 放入 29:取出哈希槽为 29 中的值 2,然后将 “29|2” 存储在第 3 个索引槽中,并更新哈希槽为 29 的值为 3,即哈希槽为 29 的第一个索引块的地址为 3。而在找到索引块为 3 的索引信息后,又能取出上一个索引块的地址 2,构成链表为: “[29]->3->2”
  • 放入 8:将 “8|0” 存储在第 4 个索引槽中,并更新哈希槽为 8 的值为 4,即哈希槽为 8 的第一个索引块的地址为 4
  • 放入 16:取出哈希槽为 16 中的值 1,然后将 “16|1” 存储在第 5 个索引槽中,并更新哈希槽为 16 的值为 5。构成链表为: “[16]->5->1”
  • 放入 16:取出哈希槽为 16 中的值 5,然后将 “16|5” 存储在第 6 个索引槽中,并更新哈希槽为 16 的值为 6。构成链表为: “[16]->6->5->1”

整个过程如下图所示:

put_key_example

四 (3)、查询消息

当需要根据来查询消息的时候,其会按照倒序回溯整个索引文件列表,对于每一个在时间上能够匹配用户传入的 beginend 时间戳参数的索引文件,会一一进行消息查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class IndexService {

public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
// 倒序
for (int i = this.indexFileList.size(); i > 0; i--) {
// 位于时间段内
if (f.isTimeMatched(begin, end)) {
// 消息查询
}
}
}

}

而具体到每一个索引文件,其查询匹配消息的过程如下所示:

  • 确定哈希槽:根据键生成哈希值,定位到哈希槽
  • 定位索引槽:哈希槽中的值存储的就是链表的第一个索引槽地址
  • 遍历索引槽:沿着索引槽地址,依次取出下一个索引槽地址,即沿着链表遍历,直至遇见下一个索引槽地址为非法地址 0 停止
  • 收集偏移量:在遇到匹配的消息之后,会将相应的物理偏移量放到列表中,最后根据物理偏移量,从 CommitLog 文件中取出消息
1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultMessageStore implements MessageStore {

@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {

for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);
// 根据偏移量从 CommitLog 文件中取出消息
}

}

}

以查询哈希值 16 的消息为例,图示如下:

query_message_hash_16_example

五、唯一键查询消息

五 (1)、构建键

消息的唯一键是在客户端发送消息前构建的:

1
2
3
4
5
6
7
8
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendKernelImpl(final Message msg, /** 其它参数 **/) throws XXXException {
// ...
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
}
}

创建唯一 ID 的算法:

1
2
3
4
5
6
7
8
9
10
public class MessageClientIDSetter {

public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING);
sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
return sb.toString();
}

}

唯一键是根据客户端的进程 ID、IP 地址、ClassLoader 哈希码、时间戳、计数器这几个值来生成的一个唯一的键,然后作为这条消息的附属属性发送到 Broker 服务器的:

1
2
3
4
5
6
7
8
9
public class MessageClientIDSetter {

public static void setUniqID(final Message msg) {
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}

}

五 (2)、索引键

当服务器收到客户端发送过来的消息之后,索引服务便会取出客户端生成的 uniqKey 并为之建立索引,放入到索引文件中:

1
2
3
4
5
6
7
8
9
10
11
public class IndexService {

public void buildIndex(DispatchRequest req) {
// ...
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
}
// ...
}

}

五 (3)、使用键查询

客户端在生成消息唯一键的时候,在 ByteBuffer 的第 11 位到第 14 位放置的是当前的时间当月第一天的时间的毫秒差:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MessageClientIDSetter {

private static byte[] createUniqIDBuffer() {
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}

// 时间差 [当前时间 - 这个月 1 号的时间]
// putInt 占据的是第 11 位到第 14 位
buffer.putInt((int) (System.currentTimeMillis() - startTime));
}

private synchronized static void setStartTime(long millis) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(millis);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
// 开始时间设置为这个月的 1 号
startTime = cal.getTimeInMillis();
// ...
}

}

我们知道消息索引服务的查询需要用户传入 beginend 这连个时间值,以进行这段时间内的匹配。所以 RocketMQ 为了加速消息的查询,于是在 Admin 端对特定 ID 进行查询的时候,首先取出了这段时间差值,然后与当月时间进行相加得到 begin 时间值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MessageClientIDSetter {

public static Date getNearlyTimeFromID(String msgID) {
ByteBuffer buf = ByteBuffer.allocate(8);
byte[] bytes = UtilAll.string2bytes(msgID);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
// 取出第 11 位到 14 位
buf.put(bytes, 10, 4);

buf.position(0);
// 得到时间差值
long spanMS = buf.getLong();

Calendar cal = Calendar.getInstance();
long now = cal.getTimeInMillis();
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
long monStartTime = cal.getTimeInMillis();
if (monStartTime + spanMS >= now) {
cal.add(Calendar.MONTH, -1);
monStartTime = cal.getTimeInMillis();
}
// 设置为这个月(或者上个月) + 时间差值
cal.setTimeInMillis(monStartTime + spanMS);
return cal.getTime();
}

}

由于发送消息的客户端和查询消息的 Admin 端可能不在一台服务器上,而且从函数的命名 getNearlyTimeFromID 与上述实现来看,Admin 端的时间戳得到的是一个近似起始值,它尽可能地加速用户的查询。而且太旧的消息(超过一个月的消息)是查询不到的。

begin 时间戳确定以后,Admin 便会将其它必要的信息如话题Key等信息封装到 QUERY_MESSAGE 的包中,然后向 Broker 服务器传递这个请求,来进行消息的查询。Broker 服务器在获取到这个查询消息的请求后,便会根据 Key 从索引文件中查询符合的消息,最终返回到 Admin 端。

六、键查询消息

六 (1)、构建键

我们提到过,在发送消息的时候,可以填充一个 keys 的值,这个值将会作为消息的一个属性被发送到 Broker 服务器上:

1
2
3
4
5
6
7
public class Message implements Serializable {

public void setKeys(String keys) {
this.putProperty(MessageConst.PROPERTY_KEYS, keys);
}

}

六 (2)、索引键

当服务器收到客户端发送过来的消息之后,索引服务便会取出这条消息的 keys 并将其用空格进行分割,分割后的每一个字符串都会作为一个单独的,创建索引,放入到索引文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class IndexService {

public void buildIndex(DispatchRequest req) {
// ...
if (keys != null && keys.length() > 0) {
// 使用空格进行分割
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
}
}
}
}

}

由此我们也可以得知,keys 键的设置通过使用空格分割字符串,一条消息可以指定多个键。

六 (3)、使用键查询

keys 键查询的方式也是通过将参数封装为 QUERY_MESSAGE 请求包中去请求服务器返回相应的信息。由于键本身不能和时间戳相关联,因此 begin 值设置的是 0,这是和第五节的不同之处:

1
2
3
4
5
6
7
8
9
10
public class QueryMsgByKeySubCommand implements SubCommand {

private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
// begin: 0
// end: Long.MAX_VALUE
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
}

}

推荐文章