说到主从复制,我们可能第一时间想到的是MySQL的主从复制。
MySQL主从复制是MySQL高可用机制之一。数据可以从数据库服务器主节点复制到一个或多个从节点。
这篇文章我们讲的是RocketMQ的主从复制。希望您看完后能明白主从复制的本质。
RocketMQ的集群模式中,Broker分为Master和Slave。 1个Master可以对应多个Slave,但1个Slave只能对应1个Master。
每个Broker与Name Server集群中的所有节点建立持久连接,并定期向所有Name Server注册Topic信息。
Master节点负责接收客户端的写请求并将消息持久化到磁盘。 Slave节点负责从Master节点复制消息数据并保持与Master节点的同步。
生产者发送消息后,Master接收消息存储请求,将消息数据同步到Slave,然后将存储结果返回给生产者。同步复制模式下,发送消息会有一定的延迟,系统吞吐量也会降低。
生产者发送消息后,Master接收消息存储请求,存储消息,并将存储结果直接返回给生产者。然后Master和Slave以异步方式同步数据。这种复制模式延迟较小,可以实现较高的吞吐量。
如果Master出现故障,部分数据可能无法写入Slave,未同步的数据可能会丢失。
复制过程分为两部分:元数据复制和消息数据复制。
Slave Broker定时任务会每10秒同步一次元数据,包括主题、消费进度、延迟消费进度、消费者配置 。
同步主题时,Slave Broker 向 Master Broker 发送 RPC 请求。返回数据后,先添加到本地缓存,然后持久化到本地。
下图是Master和Slave消息数据同步的流程图。
1。 Master启动后监听指定端口;
Master启动后会创建AcceptSocketService服务,用于创建客户端到服务器端的TCP链接。
RocketMQ 抽象出链接对象HAConnection。 HAConnection会启动两个线程,分别用于读服务和写服务:
2。 Slave启动后,尝试连接Master并建立TCP连接;
HAClient是客户端Slave的核心类,负责与Master建立连接和数据交互。
客户端启动后,首先尝试连接Master,查询当前消息存储中的最大物理偏移量,并将其存储在变量currentReportedOffset中。
3。 Slave判断拉取间隔是否大于5秒,然后将拉取的消息偏移量上报给Master;
上报进度的数据格式是Long型Offset,8个字节,非常简洁。
发送到Socket缓冲区后,修改上次写入时间lastWriteTimestamp。
4。 Master解析请求偏移量,从消息文件中取出该偏移量之后的所有消息;
当Slave向Master上报数据时,触发SelectionKey.OP_READ事件,Master将请求交给ReadSocketService服务处理:
当Slave Broker通过了自己commitlog的maxPhyOffset时,Master会立即中断www.sychzs.cn(1000)
,执行processReadEvent
方法。
processReadEvent方法的核心逻辑是设置Slave当前的进度偏移量,然后通知复制线程当前的复制进度。
写入服务 WriteSocketService 从消息文件中检索此偏移量之后的所有消息,并将消息数据发送到 Slave。
5。 Slave接收数据并将消息数据追加到消息文件commitlog中。
首先调用HAClient类中的dispatchReadRequest方法解析出消息数据;
然后将消息数据追加到本地消息存储中。
从数据复制流程图中我们发现数据复制本身就是异步执行,但是同步是如何实现的呢?
Master Broker收到写入消息的请求后,调用Commitlog的aysncPutMessage方法写入消息。
1 {IMG_18:Ahr0Chm6ly9pbwcymdizlmnuymxvz3Muy29tl2jsb2cvmjq4NZE2OS8ymDIZMJQ4NZE2OS0YMDIZMDE5NTQXNDE52MTGWNJE0OTKU d2vica ==/}这段代码中,当commitLog执行完appendMessage后,需要执行两个任务:Flash任务和同步复制。
但这两个任务不是同步执行的,而是异步的。 使用 CompletableFuture,一个异步工件。
当HAConnection读服务收到Slave的进度反馈,发现消息数据复制成功,则唤醒future。
1 {IMG_19:Ahr0Chm6ly9pbwcymdizlmnuymxvz3Muy29tl2jsb2cvmjq4NZE2OS8ymDIZMJQ4NZE2OS0YMDIZMDE5NTQXTM1NTM1NJCUD 2vica ==/}最后,Broker组装响应命令,并将响应命令返回给客户端。
1。主从复制包括元数据复制和消息数据复制两部分;
2。元数据复制
Slave Broker定时任务每10秒向Master Broker发送一次RPC请求,将元数据同步到缓存,然后持久化到磁盘;
3。消息数据复制
4。实现同步
commitLog执行完appendMessage后,需要执行两个任务:刷新任务和同步复制。这里使用了异步工件CompletableFuture。
当HAConnection读服务收到Slave的进度反馈,发现消息数据复制成功,则唤醒future。最后,Broker组装响应命令并将响应命令返回给客户端。
如果我的文章对您有帮助,请帮忙点赞、阅读、转发。您的支持将激励我产出更高质量的文章。非常感谢!