RocketMQ 消息过滤流程

讲述 RocketMQ 消息过滤流程

一、消息过滤类型

Producer 在发送消息的时候可以指定消息的标签类型,还可以为每一个消息添加一个或者多个额外的属性:

1
2
3
4
// 指定标签
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 添加属性 a
msg.putUserProperty("a", 5);

根据标签和属性的不同,RocketMQ 客户端在消费消息的时候有三种消息过滤类型:

(1) 标签匹配:

1
consumer.subscribe("TopicTest", "TagA | TagB | TagC");

(2) SQL 匹配:

1
2
3
consumer.subscribe("TopicTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 3)"));

(3) 自定义匹配:

客户端实现 MessageFilter 类,自定义过滤逻辑:

1
2
3
4
5
6
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());

String filterCode = MixAll.file2String(classFile);
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
filterCode);

对于 MessageFilter 类实现 match 方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MessageFilterImpl implements MessageFilter {

@Override
public boolean match(MessageExt msg, FilterContext context) {
String property = msg.getProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if (((id % 10) == 0) &&
(id > 100)) {
return true;
}
}

return false;
}

}

下面我们一一讲解各自背后的机制与实现原理。

二、标签匹配

当为消息指定消息标签类型的时候,实际上所指定的标签例如 TagA 是作为一个属性放入到了这条消息中的:

1
2
3
4
5
6
7
public class Message implements Serializable {

public void setTags(String tags) {
this.putProperty(MessageConst.PROPERTY_TAGS, tags);
}

}

当这条消息到达 Broker 服务器端后,用户设置的标签会计算为标签码,默认的计算方式采用的标签字符串的 hashCode() 作为计算结果的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CommitLog {

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
final boolean checkCRC,
final boolean readBody) {
// ...
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner
.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
// ...
}

}

当计算出来标签码之后,这条消息的标签码会被存放至消费队列文件中,用来与消费者客户端消费队列的标签码进行匹配。消费者客户端订阅消费话题的时候,会指定想要匹配的标签类型:

1
consumer.subscribe("TopicTest", "TagA | TagB | TagC");

这段代码在内部实现中利用 FilterAPI 构建了一个 SubscriptionData 对象:

1
2
3
4
5
6
7
8
9
10
11
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

public void subscribe(String topic, String subExpression) throws MQClientException {
SubscriptionData subscriptionData = FilterAPI
.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic,
subExpression);
// ...
}

}

当用户未指定标签或者指定为星号标签的时候,则代表用户接受所有标签的消息。如果用户指定了一个或者多个标签,那么会将每一个标签取其 hashCode() 放入到 codeSet 中。SubscriptionData 还有一个 expressionType 字段,在使用标签匹配的时候,其不会设置这个这个字段的值,因此其保留为 null。在这些信息设置好以后,当客户端发送心跳包的时候,会将这些话题的注册信息一并上传至 Broker 服务器端,方便在 Broker 端进行匹配。

1
2
3
4
5
6
7
8
9
10
public class SubscriptionData implements Comparable<SubscriptionData> {

public final static String SUB_ALL = "*";

private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();

private String expressionType;

}

当 Broker 端服务器在取消息的时候,每取出来一条消息,都会执行两道过滤机制:

  • ConsumeQueue 文件匹配
  • CommitLog 文件匹配

任一检查没有通过后,绝不会放行这条消息给客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DefaultMessageStore implements MessageStore {

public GetMessageResult getMessage(final String group, /** 其他参数 **/) {

for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

// ConsumeQueue 文件匹配
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}

continue;
}

// CommitLog 文件匹配
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}

}

}

}

消息过滤器的默认实现是 ExpressionMessageFilter ,消息过滤的默认实现策略就是看这个话题的标签码集合中是否包括当前这条消息的标签码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ExpressionMessageFilter implements MessageFilter {

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
// ...
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

if (tagsCode == null) {
return true;
}

if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}

return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

// ...
return true;
}

@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}

// ...
}

}

下图是一幅标签匹配的简要流程图:

tag_match

三、SQL 匹配

在发送消息的时候,可以为每一条消息附带一个或者多个属性值,SQL 匹配指的就是依据这些属性值TAG 标签 是否满足一定的 SQL 语句条件,来过滤消息。用户如果想要开启 SQL 匹配,那么需要在 Broker 启动的时候,启用如下几个配置信息:

1
2
3
4
brokerConfig.setEnablePropertyFilter(true);
brokerConfig.setEnableCalcFilterBitMap(true);

messageStoreConfig.setEnableConsumeQueueExt(true);

三 (1)、注册过滤信息

我们在消费者如何接受消息一文中提到过,消费者启动之后,会通过心跳包定时给 Broker 服务器汇报自己的信息。而 Broker 服务器在收到消费者的心跳包之后,会产生一个注册事件,如下所示:

1
2
3
4
5
6
7
8
9
10
public class ConsumerManager {

public boolean registerConsumer(final String group,
/** 其他参数 **/) {
// ...
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
// ...
}

}

DefaultConsumerIdsChangeListener 是默认的消费者列表注册事件通知器的实现类,其在收到注册事件以后,会将用户在消费者端订阅的话题信息注册到 ConsumerFilterManager 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {

@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
switch (event) {

case REGISTER:
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;

// ...
}
}
}

consumer_filter_manager

ConsumerFilterData 中包含了消费者客户端注册的 SQL 表达式,由上图我们可以看到对于每一个话题所对应的 FilterDataMapByTopic ,可以注册多个 SQL 表达式。但是这里需要注意的是,这多个 SQL 表达式是按照组来做区分的,就是说一个组只能有一个 SQL 表达式,客户端如果在一个组中注册了多个不同的 SQL 表达式,那么后注册的会覆盖掉前注册的。因此,如果想要对同一个组使用不同的 SQL 语句来过滤自己想要的信息,这些不同的 SQL 语句必须划分到不同的组里面才可行。

sql_in_consumer_group

三 (2)、生成 BloomFilterData

布隆过滤器 (BloomFilter) 是一种空间效率很高的数据结构,其可以用来判断某个元素是否可能存在于某个集合中。当判断结果返回 true 的时候,表示可能存在,当返回 false 的时候,表示这个元素一定不存在于这个集合中。

它的原理是当一个元素被加入集合时,通过 k 个 Hash 函数将这个元素映射成一个长度为 m 位数组(Bit array)中的 k 个点,把它们置为 1。检索时,我们只要看看这些点是不是都是 1 就(大约)知道集合中有没有它了:

  • 如果这些点有任何一个 0,则被检索元素一定不在。
  • 如果都是 1, 则被检索元素很可能在。

如下是一个采用位数组长度为 m=18 以及哈希函数个数为 k=3 实现的布隆过滤器,”x,y,z” 每一个字母都需要经过 3 次哈希函数的计算,然后映射到 3 个不同的槽中。由于字母 “w” 在经过 3 次哈希函数计算后,其中一次产生的哈希值并未命中已有的槽,因此可以确定的是 “w” 肯定不存在于这个集合中。

1298px-Bloom_filter

在 RocketMQ 的实现中,其有四个最关键的值:

1
2
3
4
5
6
7
8
9
10
11
12
public class BloomFilter {

// 最大错误率
private int f;
// 可能插入 n 个元素
private int n;
// k 个哈希函数
private int k;
// 数组总共 m 位
private int m;

}

RocketMQ 实现的布隆过滤器是根据错误率 f可能插入的元素数量 n计算出来的 km,在默认配置情况下,即如下 n = 32f = 20,计算出来需要 k = 3 个哈希函数和 m = 112 位的数组。

1
2
3
4
5
6
7
8
public class BrokerConfig {

// Expect num of consumers will use filter.
private int expectConsumerNumUseFilter = 32;
// Error rate of bloom filter, 1~100.
private int maxErrorRateOfBloomFilter = 20;

}

我们这里大致了解以下布隆过滤器的一个基本想法即可,具体算法比较复杂,也不在讨论范畴以内。当客户端注册过滤信息的时候,其会根据 “组#话题” 这个字符串计算出相应的位映射数据,也即这个字符串经过布隆过滤器中的若干个哈希函数得到的几个不同的哈希值:

1
2
3
4
5
6
7
8
9
10
public class ConsumerFilterManager extends ConfigManager {

public boolean register(final String topic, /** 其它参数 **/) {
// ...
BloomFilterData bloomFilterData =
bloomFilter.generate(consumerGroup + "#" + topic);
// ...
}

}

ConsumerFilterManager 中的话题过滤信息数据,每隔 10 秒进行一次磁盘持久化:

1
2
3
4
5
6
7
8
9
10
11
12
public class BrokerController {

public boolean initialize() throws CloneNotSupportedException {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
BrokerController.this.consumerFilterManager.persist();
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}

}

磁盘文件 consumerFilter.json 中保存的数据信息如下示例:

consumer_filter_manager_persist

上述大致流程图如下所示:

heartbeat_consumer_filter_flow

三 (3)、编译 SQL 语句

JavaCC (Java Compiler Compiler) 是一个能生成语法和词法分析器的生成程序,它通过阅读一个自定义的语法标准文件 (通常以 jj 为后缀名) ,然后就能生成能够解析该语法的扫描器和解析器的代码。

new-javacc-logo

通过执行 javacc SelectorParser.jj 命令以后,其会生成如下七个 Java 文件,用以解析 SQL 语法:

javacc_selector_parser

过滤器工厂 FilterFactory 在初次使用的时候,会注册一个 SqlFilter 类,这个类能够将消费者端指定的 SQL 语句编译解析为 Expression 表达式对象,方便后续消息的快速匹配与过滤。

1
2
3
4
5
6
7
8
public class SqlFilter implements FilterSpi {

@Override
public Expression compile(final String expr) throws MQFilterException {
return SelectorParser.parse(expr);
}

}

三 (4)、计算位映射

当 Broker 服务器接收到新的消息到来之后,一直在后台运行的 ReputMessageService 会负责将这条消息封装为一个 DispatchRequest 分发请求,这个请求会传递给提前构建好的分发请求链。在 DefaultMessageStore 的构造函数中,我们看到依次添加了构建消费队列构建索引的分发请求服务:

1
2
3
4
5
6
7
8
9
10
11
public class DefaultMessageStore implements MessageStore {

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, /** 其它参数 **/) throws IOException {

this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

}

}

而在 Broker 初始化的时候,我们看到其又添加了计算位映射的分发请求服务,并且将此分发服务放在链表的第一个位置:

1
2
3
4
5
6
7
8
public class BrokerController {

public boolean initialize() throws CloneNotSupportedException {
this.messageStore.getDispatcherList()
.addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
}

}

由此,在每次收到新的消息之后,分发请求的需要经过如下三个分发请求服务进行处理:

dispatch_request_flow

我们在这部分只介绍计算位映射的服务类实现。如下,dispatch 方法用来分发请求里面的消息,对于这每一条消息,首先根据话题取得所有的消费过滤数据。这每一条数据代表的就是一条 SQL 过滤语句信息。我们在这个地方,需要一一遍历这些过滤信息,从而完成计算位服务的需求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {
Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
Iterator<ConsumerFilterData> iterator = filterDatas.iterator();

while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();
// ...
}
}

}

在拿到 ConsumerFilterData 信息之后,其会根据这条信息内的 SQL 语句编译后的表达式来对这条消息进行检查匹配 (evaluate),看这条消息是否满足 SQL 语句所设置的条件。如果满足,那么会将先前在客户端注册阶段计算好的 BloomFilterData 中的映射位信息设置到 filterBitMap 中,即将相应的位数组 BitsArray 中的相应位设置为 1 。在验证完所有的 SQL 语句之后,会将这些所有的字节数组放置到 request 请求之中,以便交由下一个请求分发服务进行使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void dispatch(DispatchRequest request) {
BitsArray filterBitMap = BitsArray.create(this.consumerFilterManager.getBloomFilter().getM());

while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();

MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
Object ret = filterData.getCompiledExpression().evaluate(context);

// eval true
if (ret != null && ret instanceof Boolean && (Boolean) ret) {
consumerFilterManager
.getBloomFilter()
.hashTo(filterData.getBloomFilterData(),
filterBitMap);
}
}

request.setBitMap(filterBitMap.bytes());
}

三 (5)、存储位映射

MessageStore 在开启扩展消费队列的配置之后,每一个消费队列在创建的时候,都会额外创建一个扩展消费队列。每一个扩展消费队列文件的大小默认为 48MB:

1
2
3
4
5
6
7
8
9
10
public class ConsumeQueue {

public ConsumeQueue(final String topic, /** 其它参数 **/) {
// ...
if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(topic, /** 其它参数 **/);
}
}

}

在计算位映射一节中,计算好位字节数组之后,我们这里需要通过第二个分发请求服务 CommitLogDispatcherBuildConsumeQueue 来存储这些字节信息。通过如下代码,我们知道它将请求中的位映射信息、消息存储时间、标签码这三条信息封装为 ConsumeQueueExt.CqExtUnit ,然后放入到扩展消费队列文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ConsumeQueue {

public void putMessagePositionInfoWrapper(DispatchRequest request) {

long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());

long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
}
}

}

}

我们注意到在上述代码中,put 函数返回的是一个 long 类型的扩展地址,当这个数值满足 isExtAddr 要求后,其会将当前的标签码设置为刚才返回的扩展地址。那么这是为什么呢?

我们首先来看 ConsumeQueueExt 文件在存放数据成功后是如何返回信息的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ConsumeQueueExt {

public static final long MAX_ADDR = Integer.MIN_VALUE - 1L;

public long put(final CqExtUnit cqExtUnit) {
if (mappedFile.appendMessage(cqExtUnit.write(this.tempContainer), 0, size)) {
return decorate(wrotePosition + mappedFile.getFileFromOffset());
}

return 1;
}

public long decorate(final long offset) {
if (!isExtAddr(offset)) {
return offset + Long.MIN_VALUE;
}
return offset;
}

public static boolean isExtAddr(final long address) {
return address <= MAX_ADDR;
}

}

MAX_ADDR 是一个很小很小的值,为 -2147483649, 即写入位置如果不小于这个值,那么我们就认定为它不是扩展地址。需要将修正后的 写入偏移量 + Long.MIN_VALUE 确定为扩展地址。当读取信息的时候,其先读取 ConsumeQueue 文件中的最后的 Hash 标签码值,如果其通过 isExtAddr() 函数返回的是 true,那么我们就可以使用这个地址,再通过一个叫做 unDecorate() 函数将其修正为正确的 ConsumeQueueExt 文件的写入地址,从而接着读取想要的信息:

1
2
3
4
5
6
public long unDecorate(final long address) {
if (isExtAddr(address)) {
return address - Long.MIN_VALUE;
}
return address;
}

这个地方,我们发现 ConsumeQueue 中的最后一个 long 型数值,可能存储的是标签 Hash 码,也可能存储的是扩展消费队列的写入地址,所以需要通过 isExtAddr() 来分情况判断。

下图为 ConsumeQueue 文件和 ConsumeQueueExt 文件中存取信息的不同:

ext_addr_vs_tags_code

三 (6)、消息过滤

在上小节我们提到了有关扩展消费队列地址标签 Hash 码存储的不同,所以当在取消息的时候,先得从消费队列文件中取出 tagsCode,然后检查是否是扩展消费队列地址,如果是,那么就需要从扩展消费队列文件中读取正确的标签 Hash 码,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DefaultMessageStore implements MessageStore {

public GetMessageResult getMessage(final String group, /** 其它参数 **/) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
isTagsCodeLegal = false;
}
}

}
}

}

当获取到这条消息在扩展消费队列文件中存取的信息后,就会和标签匹配一节所讲述的一致,会进行两道过滤机制。我们先来看第一道 ConsumeQueue 文件匹配:

1
2
3
4
5
6
7
8
9
10
11
public class ExpressionMessageFilter implements MessageFilter {

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
byte[] filterBitMap = cqExtUnit.getFilterBitMap();
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
BitsArray bitsArray = BitsArray.create(filterBitMap);
return bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
}

}

ExpressionMessageFilter 依据 CqExtUnit 中存储的位数组重新创建了比特数组 bitsArray,这个数组信息中已经存储了不同 SQL 表达式是否匹配这条消息的结果。isHit() 函数会一一检查 BloomFilterData 中存储的位信息是否映射在 BitsArray 中。只要有任何一位没有映射,那么就可以立刻判断出这条消息肯定不符合 SQL 语句的条件。

bloomfilter_ishit

因为布隆过滤器有一定的错误率,其只能精确的判断消息是否一定不在集合中,返回成功的只能确定为消息可能在集合中。因此通过布隆过滤器检查后还需要经过第二道过滤机制,即 SQL 编译后的表达式亲自验证是否匹配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ExpressionMessageFilter implements MessageFilter {

@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
Object ret = realFilterData.getCompiledExpression().evaluate(context);

if (ret == null || !(ret instanceof Boolean)) {
return false;
}

return (Boolean) ret;
}

}

通过在验证 SQL 表达式是否满足之前,提前验证是否命中布隆过滤器,可以有效的避免许多不必要的验证:

sql_filter

四、自定义匹配

消息的自定义匹配需要开启过滤服务器上传过滤类过滤服务器委托过滤消息等步骤,下面我们一一进行说明。

四 (1)、过滤服务器

在启动 Broker 服务器的时候,如果指定了下面一行设置:

1
brokerConfig.setFilterServerNums(int filterServerNums);

即将过滤服务器的数量设定为大于 0,那么 Broker 服务器在启动的时候,将会启动 filterServerNums过滤服务器。过滤服务器是通过调用 shell 命令的方式,启用独立进程进行启动的。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class FilterServerManager {

public void createFilterServer() {
int more =
this.brokerController.getBrokerConfig().getFilterServerNums() -
this.filterServerTable.size();
String cmd = this.buildStartCommand();
for (int i = 0; i < more; i++) {
FilterServerUtil.callShell(cmd, log);
}
}

}

过滤服务器在初始化的时候,会启动定时器每隔 10 秒注册一次到 Broker 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
public class FiltersrvController {

public boolean initialize() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
FiltersrvController.this.registerFilterServerToBroker();
}
}, 3, 10, TimeUnit.SECONDS);
}

}

Broker 服务器在收到来自过滤服务器的注册信息之后,会把过滤服务器的地址信息、注册时间等放到过滤服务器表中:

1
2
3
4
5
6
public class FilterServerManager {

private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
new ConcurrentHashMap<Channel, FilterServerInfo>(16);

}

同样,Broker 服务器也需要定时将过滤服务器地址信息同步给所有 Namesrv 命名服务器,上述整个流程如下图所示:

filtersrv_startup_and_register

四 (2)、过滤类

当消费者通过使用自定义匹配过滤消息的时候,这个时候会将存储订阅信息的 SubscriptionData 中的 filterClassSource 设置为 true,以表征这个客户端需要过滤类来进行消息的匹配和过滤。

消费者客户端在启动过程中,还会定时地上传本地的过滤类源码到过滤服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MQClientInstance {

private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}

public void sendHeartbeatToAllBrokerWithLock() {
// ...
this.uploadFilterClassSource();
}

}

其中过滤服务器的地址列表是在从 Namesrv 服务器获取话题路由信息的时候取得的,话题路由信息不光存储了消息队列数据,还存储了各个 Broker 所关联的过滤服务器列表:

1
2
3
4
public class TopicRouteData extends RemotingSerializable {
// ...
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

当过滤服务器接收到来自消费者客户端的源码之后,其会首先首先生成一个键为 话题@组 的字符串来查阅过滤类信息是否已经存在于内存里面的 filterClassTable 表中且文件通过 CRC 校验。如果没有存在或校验失败,那么就需要先编译并加载这个类:

1
2
3
4
5
6
7
8
9
public class DynaCode {

public void compileAndLoadClass() throws Exception {
String[] sourceFiles = this.uploadSrcFile();
this.compile(sourceFiles);
this.loadClass(this.loadClass.keySet());
}

}

默认情况下,编译后的类存放于 $HOME/rocketmq_filter_class/$PID 目录下,类的源文件和类的字节码文件名也会相应的加上当前时间戳来确定:

filter_java_and_class_file_name

上述流程图如下:

upload_filter_class_to_filtersrv

四 (3)、过滤消息

当消费者客户端启用自定义匹配过滤消息后,发往服务器的数据中也包含了过滤标志位,这样每次拉取消息的服务器也由原来的 Broker 服务器变更为 Filtersrv 过滤服务器,其中过滤服务器地址的选择是随机确定的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class PullAPIWrapper {

public PullResult pullKernelImpl(final MessageQueue mq, /** 其它参数 **/) throws Exception {
// ...
if (findBrokerResult != null) {

if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
// 从过滤服务器拉取消息
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}

// ...
}
}

}

过滤服务器在启动的时候,内部还启动了一个 PullConsumer 客户端,用以从 Broker 服务器拉取消息:

1
2
3
4
5
6
7
8
9
10
11
public class FiltersrvController {

private final DefaultMQPullConsumer defaultMQPullConsumer =
new DefaultMQPullConsumer(MixAll.FILTERSRV_CONSUMER_GROUP);

public void start() throws Exception {
this.defaultMQPullConsumer.start();
// ...
}

}

当过滤服务器收到真正的消费者发来的消费消息的请求之后,其会委托内部的 PullConsumer 使用包含在请求体内的偏移量去 Broker 服务器拉取所有消息,此时这些消息是完全没有过滤的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DefaultRequestProcessor implements NettyRequestProcessor {

private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
final RemotingCommand request) throws Exception {

MessageQueue mq = new MessageQueue();

mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.filtersrvController.getBrokerName());

// 设置偏移量和最大数量
long offset = requestHeader.getQueueOffset();
int maxNums = requestHeader.getMaxMsgNums();

// 委托内部消费者从 Broker 服务器拉取消息
pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);

}

}

过滤服务器从 Broker 服务器获取到完整的消息列表之后,会遍历消息列表,然后使用过滤类一一进行匹配,最终将匹配成功的消息列表返回给客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DefaultRequestProcessor implements NettyRequestProcessor {

private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
final RemotingCommand request) throws Exception {
final PullCallback pullCallback = new PullCallback() {

@Override
public void onSuccess(PullResult pullResult) {
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgListOK = new ArrayList<MessageExt>();
for (MessageExt msg : pullResult.getMsgFoundList()) {
// 使用过滤类过滤消息
boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
if (match) {
msgListOK.add(msg);
}
}
break;
// ...
}

}

};

// ...
}

}

上述流程如下图所示:

filter_message_by_filtersrv

五、参考

推荐文章