当前位置:数据分析 > 说起RocketMQ主从复制

说起RocketMQ主从复制

  • 发布:2023-10-02 02:22

说到主从复制,我们可能第一时间想到的是MySQL的主从复制。

MySQL主从复制是MySQL高可用机制之一。数据可以从数据库服务器主节点复制到一个或多个从节点。

这篇文章我们讲的是RocketMQ的主从复制。希望您看完后能明白主从复制的本质。

1 同步和异步

RocketMQ的集群模式中,Broker分为Master和Slave。 1个Master可以对应多个Slave,但1个Slave只能对应1个Master。

每个Broker与Name Server集群中的所有节点建立持久连接,并定期向所有Name Server注册Topic信息。

Master节点负责接收客户端的写请求并将消息持久化到磁盘。 Slave节点负责从Master节点复制消息数据并保持与Master节点的同步。

  • 同步复制

生产者发送消息后,Master接收消息存储请求,将消息数据同步到Slave,然后将存储结果返回给生产者。同步复制模式下,发送消息会有一定的延迟,系统吞吐量也会降低。

  • 异步复制

生产者发送消息后,Master接收消息存储请求,存储消息,并将存储结果直接返回给生产者。然后Master和Slave以异步方式同步数据。这种复制模式延迟较小,可以实现较高的吞吐量。

如果Master出现故障,部分数据可能无法写入Slave,未同步的数据可能会丢失。

复制过程分为两部分:元数据复制消息数据复制

  • 主从服务器同步主题、消费进度、延迟消费进度、消费配置数据
  • 主从服务器同步消息数据

2 元数据复制

Slave Broker定时任务会每10秒同步一次元数据,包括主题消费进度延迟消费进度消费者配置

同步主题时,Slave Broker 向 Master Broker 发送 RPC 请求。返回数据后,先添加到本地缓存,然后持久化到本地。

3 消息数据复制

下图是Master和Slave消息数据同步的流程图。

1。 Master启动后监听指定端口;

Master启动后会创建AcceptSocketService服务,用于创建客户端到服务器端的TCP链接。

RocketMQ 抽象出链接对象HAConnection。 HAConnection会启动两个线程,分别用于读服务和写服务:

  • 读服务:处理Slave发送的请求
  • 写服务:用于向Slave传输数据
8 {IMG_8: Ahr0CHM6LY9PBWCYMDIZLMNUYMXVZ3MUY2JSB2CVMJQ4NZE2OS8ymDizmDIZMJQ4NZE2OS0YMDIZMDE5XMTK2OC0XNIXNTA3L nbuzw ==/}

2。 Slave启动后,尝试连接Master并建立TCP连接;

HAClient是客户端Slave的核心类,负责与Master建立连接和数据交互。

客户端启动后,首先尝试连接Master,查询当前消息存储中的最大物理偏移量,并将其存储在变量currentReportedOffset中。

3。 Slave判断拉取间隔是否大于5秒,然后将拉取的消息偏移量上报给Master;

上报进度的数据格式是Long型Offset,8个字节,非常简洁。

发送到Socket缓冲区后,修改上次写入时间lastWriteTimestamp。

4。 Master解析请求偏移量,从消息文件中取出该偏移量之后的所有消息;

当Slave向Master上报数据时,触发SelectionKey.OP_READ事件,Master将请求交给ReadSocketService服务处理:

当Slave Broker通过了自己commitlog的maxPhyOffset时,Master会立即中断www.sychzs.cn(1000) ,执行processReadEvent方法。

processReadEvent方法的核心逻辑是设置Slave当前的进度偏移量,然后通知复制线程当前的复制进度。

写入服务 WriteSocketService 从消息文件中检索此偏移量之后的所有消息,并将消息数据发送到 Slave。

5。 Slave接收数据并将消息数据追加到消息文件commitlog中。

首先调用HAClient类中的dispatchReadRequest方法解析出消息数据;

然后将消息数据追加到本地消息存储中。

4 实现同步

从数据复制流程图中我们发现数据复制本身就是异步执行,但是同步是如何实现的呢?

Master Broker收到写入消息的请求后,调用Commitlog的aysncPutMessage方法写入消息。

1 {IMG_18:Ahr0Chm6ly9pbwcymdizlmnuymxvz3Muy29tl2jsb2cvmjq4NZE2OS8ymDIZMJQ4NZE2OS0YMDIZMDE5NTQXNDE52MTGWNJE0OTKU d2vica ==/}

这段代码中,当commitLog执行完appendMessage后,需要执行两个任务:Flash任务同步复制

但这两个任务不是同步执行的,而是异步的。 使用 CompletableFuture,一个异步工件

当HAConnection读服务收到Slave的进度反馈,发现消息数据复制成功,则唤醒future。

1 {IMG_19:Ahr0Chm6ly9pbwcymdizlmnuymxvz3Muy29tl2jsb2cvmjq4NZE2OS8ymDIZMJQ4NZE2OS0YMDIZMDE5NTQXTM1NTM1NJCUD 2vica ==/}

最后,Broker组装响应命令,并将响应命令返回给客户端。

5 总结

1。主从复制包括元数据复制和消息数据复制两部分;

2。元数据复制

​ Slave Broker定时任务每10秒向Master Broker发送一次RPC请求,将元数据同步到缓存,然后持久化到磁盘;

3。消息数据复制

  • Master开始监听指定端口
  • Slave 启动 HaClient 服务并与 Master 创建 TCP 链接
  • Slave向Master报告存储进度
  • Master收到进度后,从消息文件中取出offset后的所有消息,传输给Slave
  • Slave 接收到数据后,将消息数据追加到本地消息存储中。

4。实现同步

​ commitLog执行完appendMessage后,需要执行两个任务:刷新任务同步复制。这里使用了异步工件CompletableFuture。

​ 当HAConnection读服务收到Slave的进度反馈,发现消息数据复制成功,则唤醒future。最后,Broker组装响应命令并将响应命令返回给客户端。


如果我的文章对您有帮助,请帮忙点赞、阅读、转发。您的支持将激励我产出更高质量的文章。非常感谢!

相关文章

热门推荐