RocketMQ 主备同步

介绍 RocketMQ 的主备同步机制 !

(1) 简介

RocketMQ 通过 Master-Slave 主备机制,来实现整个系统的高可用,具体表现在:

  • Master 磁盘坏掉,Slave 依然保存了一份
  • Master 宕机,不影响消费者继续消费

(2) 搭建环境

我们在一台机器上搭建一个 Master 一个 Slave 的环境:

master_slave_sync_architecture

为了能够将 Master 和 Slave 搭建在同一台计算机上,我们除了需要将 Broker 的角色设置为 SLAVE ,还需要为其指定单独的 brokerIdstorePathRootDirstorePathCommitLog

1
2
3
4
5
6
7
8
9
10
11
12
# SLAVE 角色
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

# 一个机器如果要启动多个 Broker,那么每个 Broker 的 store 根目录必须不同
messageStoreConfig.setStorePathRootDir(storePathRootDir);
# 一个机器如果要启动多个 Broker,那么每个 Broker 的 storePathCommitLog 根目录必须不同
messageStoreConfig.setStorePathCommitLog(storePathCommitLog);
# 设置 Slave 的 Master HA 地址
messageStoreConfig.setHaMasterAddress("localhost:10912");

# SLAVE 角色的 brokerId 必须大于 0
brokerConfig.setBrokerId(1);

注意 Slave 和 Master 的 brokerName 必须一致,即它们必须处于同一个 BrokerData 数据结构里面。实际上在做了如上的修改之后, Slave 和 Master 依旧不能同时运行在同一台机器上,因为 Slave 本身也可以称为 Master,接受来自其他 Slave 的请求,因此当运行 Slave 的时候,需要将 HAService 里面的启动 AcceptSocketService 运行的相关方法注释掉

(3) 建立连接

当一个 Broker 在启动的时候,会调用 HAServicestart() 方法:

1
2
3
4
5
6
7
8
9
10
11
public class HAService {

public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();

this.groupTransferService.start();
this.haClient.start();
}

}

AcceptSocketService 服务的功能是 Master 等待接受来自其它客户端 Slave 的连接,当成功建立连接后,会将这条连接 HAConnection 放入到 connectionList 连接列表里面。而 HAClient 服务的功能是 Slave 主动发起同其它 Master 的连接。

master-slave-build-connections

(4) 数据传输

当启动 HAService 之后,一旦 Master 发现和 Slave 不同步,那么Master 会自动开始同步消息到 Slave,无需其它的触发机制。

master-slave-sync-data

(5) 消息异步传输

如果 Master Broker 的角色是 ASYNC_MASTER,那么消息等待从 Master 同步到 Slave 的方式是异步传输的方式。这意味当一条消息发送到 Master Broker 的时候,Master Broker 在存储完这条消息到本地之后,并不会等待消息同步到 Slave Broker 才返回。这种方式会缩短发送消息的响应时间。

(6) 消息同步传输

如果 Master Broker 的角色是 SYNC_MASTER,那么消息等待从 Master 同步到 Slave 的方式是同步传输的方式。除此之外,进入同步方式还得满足另外两个条件:

  • 消息体的 PROPERTY_WAIT_STORE_MSG_OK 属性值为 true,即这条消息允许等待
  • Slave 相比 Master 落下的同步进度不能超过 256MB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CommitLog {

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();

// 消息是否允许等待同步
if (messageExt.isWaitStoreMsgOK()) {

// Slave 是否没有落下 Master 太多
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
// 等待同步完成
// ...
}

// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}

}

}

其中 isSlaveOK 方法就是用来检测 Slave 和 Master 落下的同步进度是否太大的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HAService {

public boolean isSlaveOK(final long masterPutWhere) {
boolean result = this.connectionCount.get() > 0;

result =
result

&& ((masterPutWhere - this.push2SlaveMaxOffset.get()) <
this.defaultMessageStore
.getMessageStoreConfig()
.getHaSlaveFallbehindMax()); // 默认 256 * 1024 * 1024 = 256 MB

return result;
}

}

如果上面两个条件不满足的话,那么 Master 便不会再等待消息同步到 Slave 之后再返回,能尽早返回便尽早返回了。

消息等待是否同步到 Slave 是借助 CountDownLatch 来实现的。当消息需要等待的时候,便会构建一个 GroupCommitRequest ,每个请求在其内部都维护了一个 CountDownLatch ,然后通过调用 await(timeout) 方法来等待消息同步到 Slave 之后,或者超时之后自动返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class GroupCommitRequest {

private final CountDownLatch countDownLatch = new CountDownLatch(1);

public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}

public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK;
} catch (InterruptedException e) {
log.error("Interrupted", e);
return false;
}
}

}

我们再重点来看几个循环体和唤醒点:

  • GroupTransferService 服务的是否处理请求循环体唤醒点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class GroupTransferService extends ServiceThread {

public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
// ...
// 放入请求,唤醒
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

public void run() {
// 循环体
while (!this.isStopped()) {
try {
// putRequest 会提前唤醒这句话
this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}

}

}
  • HAConnection是否进行消息传输循环体唤醒点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class WriteSocketService extends ServiceThread {

@Override
public void run() {
// 循环体
while (!this.isStopped()) {
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
// 传输(写入)消息
} else {
// 等待 100 毫秒或者提前被唤醒
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
}
}

}

public class CommitLog {

public void handleHA(AppendMessageResult result,
PutMessageResult putMessageResult,
MessageExt messageExt) {
GroupCommitRequest request =
new GroupCommitRequest(result.getWroteOffset() +
result.getWroteBytes());
service.putRequest(request);
// 提前唤醒 WriteSocketService
service.getWaitNotifyObject().wakeupAll();
}

}
  • Slave 汇报进度唤醒 GroupTransferService, 等待同步完成唤醒 GroupCommitRequestCountDownLatch:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class ReadSocketService extends ServiceThread {

private boolean processReadEvent() {
// 唤醒 GroupTransferService
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}

}

class GroupTransferService extends ServiceThread {

// 被唤醒
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}

private void doWaitTransfer() {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

// 5 次重试
for (int i = 0; !transferOK && i < 5; i++) {
// 等待被唤醒或者超时
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

// 唤醒 GroupCommitRequest 的 CountDownLatch
req.wakeupCustomer(transferOK);
}
}

}

public static class GroupCommitRequest {

// 被唤醒
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}

}

下图是上图一个完整的消息唤醒链:

wait-notify-sync

(7) 主备消费

当消费者在消费的时候,如果 Master 突然宕机,那么消费者会自动切换到 Slave 机器上继续进行消费。

(8) 消费建议

RocketMQ 提供了自动从 Slave 读取老数据的功能。这个功能主要由 slaveReadEnable 这个参数控制。默认是关的(slaveReadEnable = false)。推荐把它打开,主从都要开。这个参数打开之后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是不是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio = 40 by default)。如果超过了,就会告诉客户端去备机上消费数据。如果采用异步主从,也就是 brokerRole 等于 ASYNC_AMSTER 的时候,你的备机 IO 打爆,其实影响不太大。但是如果你采用同步主从,那还是有影响。所以这个时候,最好挂两个备机。因为 RocketMQ 的主从同步复制,只要一个备机响应了确认写入就可以了,一台 IO 打爆,问题不大。

(9) 异常处理

Q: Master(Slave) 读取来自 Slave(Master) 的消息异常 (IOExceptionread() 返回 -1 等) 的时候怎么处理?
A: 打印日志 + 关闭这条连接

Q: Master(Slave) 长时间没有收到来自 Slave(Master) 的进度汇报怎么处理?
A: 每次读取之后更新 lastReadTimestamp 或者 lastWriteTimestamp,一旦发现在 haHousekeepingInterval 间隔内 (默认 20秒) 这个时间戳都没有改变的话,关闭这条连接

Q: Slave 检测到来自 Master 汇报的本次传输偏移量和本地的传输偏移量不同时怎么处理?
A: 打印日志 + 关闭这条连接

Q: Master 如何知道 Slave 是否真正的存储了刚才发送过去的消息?
A: Slave 存储完毕之后,通过向 Master 汇报进度来完成。相当于 TCP 的 ACK 机制。

Q: Master 宕掉
A: 无论 Maser 是主动关闭 Mater,还是 Master 因为异常而退出,Slave 都会每隔 5 秒重连一次 Master

推荐文章