rocketmq RocketMq 基本扫盲
引言
本篇是RocketMq扫盲,并不会讲到各个组件实现的细节内容,这里从整体全局的角度看关于RocketMq的整体设计。
理论知识略显枯燥乏味,可以大致了解一些基本概念之后,直接上手源代码以及参考官方文档了解各个组件的细节和设计思路,Rocket各个子组件相对比较独立,可以拆分单一子组件一一攻破。
一、基本介绍
一句话介绍RocketMQ
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等主流消息队列。
关键词
- 纯JAVA编写
- 分布式队列模型。
- 支持事务消息、顺序消息、批量消息、消息回溯等等。
- 开源,为 Apach 基金会 顶级项目之一。
二、RocketMQ 常见问题
这些常见问题贯穿整个消息队列的学习,不同的消息队列有不同的解决方案,需要进行横向和纵向对比。
- Broker是如何进行分片存储的?
- Broker的内部消息如何实现主从同步?
- RocketMq如何确保MessageId唯一性?
- 百万消息堆积的低延迟如何做到的?
- RocketMQ 如何保证消息不丢失?
- 如何保持消息的顺序消费?
- 如何防止消息重复消费问题?
三、什么是消息队列
In computer science, message queues and mailboxes are software-engineering
components typically used for inter-process communication (IPC), or for inter-thread
communication within the same process. They use a queue for messaging – the passing of
control or of content. Group communication systems provide similar kinds of functionality.
在计算机科学类领域,消息队列和邮箱都是软件工程组件,通常用于计算机内部同一进程或者同一进程的线程内通信(IPC),它们通过队列来传输控制信息或者内容,群组通信提供类似功能。
下面是来自 《数据密集型应用系统设计》 第四章的部分介绍内容。
消息队列最早是由一些商用收费软件控制,后续随着出现各种开源软件kafka、activeMQ、HornetQ、RabbitMQ等流行,消息队列逐渐走进互联网的视野。
和RPC异步通信对比,消息队列有下面几个特点:
- 消息队列可以充当缓冲照顾双方的处理能力。
- 避免发送方需要知道接收方IP和地址的问题。
- 支持一个消息发给多个接收方。
- 逻辑上的发送方和接收方分离。
简单来说消息队列就是使用队列来通信的组件,在最初的定义里面,它仅仅用用于双端通信,但是并没有特殊的规定和要求。
但是到了现在,消息队列更多指消息中间件,而消息中间件并不只有消息队列,同时消息队列本身的职责也在逐渐复杂和变化。
3.1 为什么需要消息队列
很简单,因为传统的WEB架构对于几百的用户量基本绰绰有余,但是一旦上万、上千万、上亿的时候,就必须要借助消息队列来帮助系统解耦,消息队列是重要的高并发和高可用组件。
你也可以简单的理解消息队列就是抗"揍"的。
3.1.1 异步处理(异步)
异步这个概念其实很早就出现了,异步处理通常用于实现需要快速响应同时需要处理重要业务的情况,比如购物之后用户想早点看到付款结果,这时候就内部系统可能需要调用商户服务,用户服务,订单服务,积分服务、结算服务配合。
这些业务会形成形成一条很长的链路,不可能等到业务全部完成才对客户响应,客户仅仅想要看到的是自己的购物是否成功。
消息队列异步处理的主要作用是减少业务请求请求阻塞,提高业务处理能力的同时做到快速的响应,业务系统可以更加专注业务本身,同时利用队列完成不同业务的“转接”处理。
3.1.2 服务解耦(解耦)
实现服务解耦指的是请求和接收方可以是两个完全独立的系统,通常消息队列是作为不同服务之间信息传递的桥梁。
3.1.3 流量控制(削锋)
传统的WEB 服务实际上抗压能力比较弱,因为这些服务不仅需要接受参数,还需要处理大量的业务,这期间需要操作大量的表和数据,速度自然就会慢下来。消息队列剥离开这一层,通过负载均衡的手段进行流量控制,同时提供多种消费模式进行抗压,这些都是传统WEB服务所不具备的特性。
哪怕SQL写的非常完美,数据处理终究需要时间,流量激增的情况下消息队列是一种很重要的保命组件。
注意消息队列本身也是双刃剑,虽然加一层可以解决软件架构难题,但是也造成系统的复杂性,所以需要权衡业务和技术的变化进行选择,而不是为了高性能单调的引入。
3.2 消息队列缺点
多了一个队列,自然也就多了更多有可能发生的问题,消息队列也是双刃剑,存在下面一些隐患:
- 重复消费:比如某些扣款业务或者结算业务,重复消费某一个消息是十分危险的。
- 顺序消费:有些场景必须要顺序消费,比如扣款之后需要将钱转移到中间账户,此时需要将未完成的交易金额进行冻结,如果这个冻结出现在付款之前,而付款出现异常,那么等于说是用户平白无故被冻了一笔钱,也是无法接受的,所以消费顺序非常重要。
- 分布式事务:单体服务通常只需要用框架的注解即可完成事务一致性,而消息队列一般用在微服务的分布式环境当中,如果一个系统的消息推送到另一个系统,需要保证分布式事务一致,则会引入更多的复杂度。
- 消息堆积:如果消费者无法及时的处理消息,此时就会出现消息堆积,消息堆积带来是消费者的长时间无法响应,以及业务阻塞。
四、RocketMq 概览
4.1 主要特点
4.2 队列模型选择
队列模型的主要目的是决定消费者的消费逻辑和数据结构设计。在不同的中间件中不同的队列模型会直接影响数据结构和框架设计。
队列模型选择分为传统队列模型和发布/订阅模型,以现在的主流中间件使用来看,发布/订阅模型更胜一筹。
4.2.1 传统队列模型
传统队列模型是遵循生产者消费者模式,这种模式就像是我们日常生活中的抢票,谁抢到就是谁的,所以有可能一个人抢到很多张票,有的完全没有票的情况。
这里直接偷了图拿来介绍了,队列可以存储多个生产者消息,但是消费者存在竞争,同一时间每一条消息只能是一个消费者消费。
4.2.2 发布/订阅模型
在设计模式的领域的发布订阅的模式换个说法,其实就是指观察者模式,在消息队列当中是十分重要的队列模型设计。
发布订阅模型它可以解决一个消息被多个消费者消费的问题,主要的处理流程把消息封装到Topic当中,所有订阅了这个Topic的消费者才可以参与消费。
发布订阅模型在生活中十分常见,比如QQ、微信的群聊。发布订阅模型支持同一条消息被多个消费者消费的广播模式,也支持负载均衡的消费模式。
4.2.3 主流消息队列选择
目前主流消息队列主要选择如下:
kafka
和rocketmq
:使用发布订阅模式,设计模式也叫“观察者模式”。
RabbitMq
和ActiveMq
:使用了传统队列模型,或者说抢车票。
4.3 主题模型
RocketMq的主体模型是发布订阅模型,那么发布订阅模型使用的是“观察者模式”,观察者模式过去个人写过一篇文章:
考古链接:# 浅谈设计模式 - 观察者模式(四)
这里简单回顾一下文中的案例,观察者模型就类似于我们去平台买基金,一支基金可能会有多个关注者,一个关注者也可以关注多支基金变动。
当股市开盘的时候,基金的变动可以被所有人看到直到当天股市闭市场,这时候要么“天台蹦迪”,要么“今晚开酒”。
4.4 消息模型
消息模型可能要比前面几个稍微复杂一点,我们需要理清消息、队列、主题 这三个主体之间的关系。
三者的关系大致是一个嵌套关系,消息往队列进行推送,而一个主题下可以有多个队列接受消息,这里画一个图来进行理解:
在RocketMq的发布订阅模型中,我们所说的消息是不能凭空存在的,它必须存储在某个Queue当中,Queue和日常我们理解的队列没有区别,一条消息就是Queue里面的一个成员,Queue本身也不是单独存在,而是存放于一个Topic当中,也就是我们所说的主题。
为什么要设计这么麻烦,生产者构建消息发给消费者不行么?
答案是确实可以,不过会功能过于羸弱了,可以想象一下这样一来的消息发送就类似短信轰炸,消费者不知道自己要什么消息,只知道会有生产者发给它,生产者也不知道要发给哪个消费者消费,所以它的消息会一股脑全给消费者,消费者还需要额外编写一大堆逻辑从中挑取需要的消息。
如果是一对一传话,这种传信倒是很方便,因为你知道我要啥,我也知道你发的啥。
但是一旦超过多个对象互相传信,这样的会造成严重的高耦合和结构混乱,也不符合消息队列高性能,简单架构的特点。
此外,一个主题下使用多个队列可以提高并发性能,最直观的比喻就是核酸队伍排队的时候,四个通道的总是要比一个通道的效率高非常多。
是不是感觉画面来了。
实际的业务部署中,消费者和生产者会组成集群,集群以消费者组和生产者组的方式存在。
生产者组和消费者组的具体概念我们分别放到组件进行介绍,这里只要明白他们是生产者和消费者构建的集群即可。
消费位点
消费位点的功能是对于消费进度进行跟踪,因为消费者队列中的内容有可能被多个消费者消费,所以必须要知道每个消费者的消费进度,同时消费位点也可以使得被消费的内容不会重复消费。
4.2 基础运转流程
主要的运转流程如下:
-
NameServer 启动,Broker进行服务注册,NameServer需要注册所有的Broker。
-
服务发现,连接所有的生产者和消费者,并且定时进行心跳包发送。
-
生产者发送消息之前从NameServer获取Broker的注册列表,根据负载均衡算法选出其中一台Broker进行消息推送。
-
Namever 和Broker 保持长连接,同时每30S进行一次心跳检测,如果检测到超过 120S 没有响应(心跳检测机制),从路由表将其删除。
-
消费者订阅某个主题前,需要先从NameSever 查找对应的Broker列表(或者某个集群),从Broker当中订阅消息进行消费,同时由Broker指定订阅规则。
4.3 相关术语
4.4.1 消息
消息是数据传输的载体,也是生产和消费者消费的最小单位,消息必须要属于一个主题,每个消息拥有唯一的Message ID,可以携带具有业务标识的Key,系统提供了Message ID 以及Key进行查询的功能。
4.4.2 标签
标签类似子菜单,是对于消息的进一步划分,主要是用于用户区分消息的业务标识,比如同一类标签的消息放到一起进行消费,比如有一个订单的Topic,可以划分出订单交易成功Tag消息、订单失败Tag消息、订单手续费Tag消息等,也就是说同一个Topic可以划分出不同的Tag的消息,做出更细化的消息控制。
总之,标签可以简单看作是消息的“Category”。
4.4.3 主题
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
主题可以单纯的看作一个队列,生产者所生产的消息是无法发送的,需要推送到主题这个队列当中才能通过Broker发送给消费者进行消费。
五、RocketMq 设计理念
5.1 集群部署
可以发现主要的模块都可以进行集群部署,所以我们可以对于 Producer、Consumer、Broker、NameServer 四个模块进行集群处理,用一张图表示。
这里同样偷了张图介绍基本的架构模式:
注意这里基本符合一个简单的实际业务部署形式,每个组件构建集群的同时,Broker 集群还设置了Master-Slave主从节点实现高可用。
简单介绍
这里介绍各个角色的职责。
Producer:
消息生产者,可以集群部署。推送消息首先需要和NameServer任意一台进行长连接,根据路由算法找到Topic实际存储在哪一台Broker Master上面,然后再和Broker进行长连接发消息,生产者支持同步和异步的方式发送消息信息。
Consumer:
消息消费者,也可以集群部署。同样需要和NameServer进行长连接,然后通过路由算法找到Broker,找到对应的Master或者Slave,然后和Broker建立长连接并,消费者支持集群消费和广播消费。根据消费模型,还可以进行顺序消费和并发消费。
Broker:
消息的实际存储媒介,需要连接NameServer以提供路由信息,Producer在发送消息之前需要通过NameServer找到Broker推送消息,Consumer同样需要按此操作找到Broker,然后才能够消费消息。
NameSever:
轻量级无状态设计,主要负责为整个消息队列提供路由服务以及Broker管理,相互之间可以构建集群,但是NameSever并不直接通信,而是独立部署服务。NameSever 是为了替代Zookeeper强一致性的的存在。
各个组件的细节这里拆分到01d - RocketMq 基础架构详解里面进行解释。
集群作用
如果把这些角色组成集群,那么各个集群又可以划分下面的功能:
Consumer:
支持Push和Pull两种消费模式,支持集群消费和广播消费。
Producer:
支持以多种负载均衡的模式向Broker发送消息。
NameSever:
需要独立服务部署,但是不妨碍节点互相通信构成集群。NameSever可以看作是轻量级的单体服务,主要的功能如下:
- 管理Broker集群,定期发送心跳包检测Broker是否存在。
- 为生产者和消费者以及Broker进行路由管理。
Broker:
负责Topic和Queue的消息存储,支持推和拉两种模式。提供上亿级别的消息顺序消息堆积。此外提供可视化管理平台,这些都是特有功能。
Broker 相比其他模块要复杂不少,主要分为下面的内容:
- 远程处理模块。Broker入口,处理客户端信息。
- 客户端管理,管理客户端功能,维护消费者和主题订阅。
- HA 服务,提供主从 Broker 间数据同步。
- 索引服务,使用健作为索引构建索引。
- 存储服务,提供在物理硬盘上存储和查询消息的简单 API。
5.2 事务消息
下面是官方原话:
事务消息是指应用本地事务和发送消息操作可以被定义到全局事务中。类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
大家不要被这句话骗了,这句话的意思是事务消息不提供分布式事务的完美解决方案,我提供类似的事务功能,但是只能保证最终数据一致性,只是针对消息发送和数据落库的保证,注意RocketMq通常的做法是先执行本地事务后发送消息。
但是Rocket事务消息存在比较多的缺陷,比如只支持单事务,再比如模板代码迁移麻烦等情况,也和我们想象的“Spring事务”不太一样,这一点后续文章会详细介绍。
5.3 定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的Topic。
目前版本的RocketMq并没有实现精确的时间控制,目前支持的延迟级别如下:
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义 messageDelayLevel 。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
5.4 消息过滤
消息过滤指的是可以在某些条件下对于Topic的消息进行过滤,也叫做条件消费。
消息过滤是通过Tag实现的,基于消息属性过滤需要SQL92表达式完成。此外消息过滤可以发生在消费者和Broker两处:
- Broker消息过滤:如果是Broker过滤,则从源头过滤出消费者需要的消息。
- 消费者过滤:更像是权限控制,只将满足条件的消息放行,但是不阻碍其他消息进入。
5.5 高效IO
因为Rocket会对于消息进行持久化,追求高吞吐量同时需要保证高效IO。高效 IO靠的是下面几个组件:
RocketMq使用了文件组的概念,为了实现磁盘对齐,保证每一个文件大小固定,同时引入了内存映射机制辅助。所有的消息都是基于顺序写,同时为了高效查询与高效消费,还引入了索引文件的概念。
高效IO的另一个体现是在日志中使用追加写入代替修改和删除,这个设计个人猜想是借鉴了Lsm-Tree 的设计思路。具体可以看看《数据密集型型系统设计》LSM-Tree VS BTree
5.6 不完美设计
所谓不完美设计的关键点:Rocket只保证消息必须至少消费一次,但是不保证重复消费不会出现,也就是说设计上本身就是存在重复消费的。所以要避免重复消费,需要客户端自己通过各种方式规避重复消费问题。
5.7 去ZK
RocketMq早期和Kafka一样依赖Zookeeper实现强一致性,但是随着版本迭代,很快开发出轻量级的NameSever替代Zookeeper,摒弃了路由信息的强一致性,转而实现了在分钟级别下面的强一致性(30S心跳检测),并且实现了数据的最终一致性特征性。
简化设计和去中间件带来的是运维难度的减少,和架构的实现简洁。
5.8 消息模型(Message Model)
- 主要分成三部分:Producer、Consumer、Broker。对应功能是生产者、消费者、存储消息。
- Broker 实际部署对应一台服务器,Broker可以存储多个Topic消息。
- 每个Topic也可以分片存储在不同的Broker。
- Message Queue 用于存储消息物理地址。
- 每个Topic 的消息地址存储在多个 Message Queue 当中。
- ConsumerGroup 由多个实例构成。
Rocket提供了多种消费模型,并发消费和顺序消费。
5.81 并发消费
对于一个队列的消息,每一个消费者都会创建一个线程池对于消息进行多线程处理,所以有可能偏移量大的消息会比偏移量小的消息先进行消费。
这里用进度条的例子做一个不是很恰当的比喻,我们加载视频的时候,没有看到的内容会事先缓存,有时候就会往后的位置要比往前的位置更早加载的情况。
5.8.2 顺序消费
RocketMq 虽然可以做到顺序消费,但是需要注意它所保证的是同一个分区的顺序消费,如果要保证整个全局的顺序消费一致,需要把所有的消息发往一个分区,因为每一个分区可以看作是FIFO队列。
实际使用是全局指定同一个Topic,然后为业务分类不同的标签Tag,也就是实现了全局消费一致的问题,但是这种一致性通常在业务量不大的情况下可以这么玩。
某些场景下必须要进行顺序消费,比如Mysql Bin文件恢复的场景,这时候消息需要按照y严格顺序进行消费,但是代价是集群变单机,并且一个节点不可用导致整个节点不可用。
所以RocketMQ 在实现中提供了基于FIFO队列的顺序消费模型,虽然每一个消费者依然会创建一个多线程,但是队列会通过加锁的方式实现同步消费。
注意在消费模型中并发消费总共会进行16次重试,并且每一次重试的时间会逐渐拉长。而顺序消费在消费某一个消息的时候,如果消费失败,会一直进行重复消费。显然顺序消费在使用的时候需要进行异常区分,比如区分是业务异常,系统异常,如果不是业务异常,则重试多少次都是没有意义的。这时候需要提供额外的报警机制。
5.8.3 消费进度
消费者进行消费的时候,采用的是类似进度条的偏移点位标记方式,类似我们视频看到一半的时候,下次打开回到上一次打开的位置,继续完成后续的操作。
在RocketMq当中实现消费进度的前提是消费组,消费点位存储在消费组中。
单体消费者不需要消费进度的加持,拿到直接消费即可。
如果是集群模式,消息存储在Broker,消费进度存储在${ROCKETMQ_HOME}/store/config/consumerOffset.json
。这个路径通过BrokerHelper工具帮助类进行管理。
而如果是广播模式,消费进度文件存储在用户的主目录,默认文件路径名:${USER_HOME}/.rocketmq_offsets
。
官方也有相关说明,注意在文件前面的点号:
默认情况下,offsets.json 文件在 /home/{user}/.rocketmq_offsets 中
5.9 重要升级
5.9.1 多副本(4.5.0)
客户端在4.5.0当中有一个重要升级,多副本概念,所谓的多副本概念指的是。
升级说明:Release Notes - Apache RocketMQ - Version 4.5.0 - Apache RocketMQ
ISSUE:[ ISSUE-1046 ]- Support multiple replicas for RocketMQ.
GIthub原始链接:Add store with dledger by dongeforever · Pull Request #1046 · apache/rocketmq (github.com)
简要说明:
实现基于 raft 的代理,指的是一个复制组(Master-Slave)可以演变为给予Raft 协议的复制组,复制组内的使用Raft协议保持节点数据强一致性,主要用于金融对于数据强一致性的场景。
5.9.2 自动主从切换(5.0)
RocketMq 5.0 支持了自动的主从切换,详细可以阅读Deployment.md
(RocketMQ 5.0 自动主从切换)部分进行了解。
主要增加支持自动主从切换的Controller组件,Controller组件可以独立部署也可以内嵌在NameServer中。
- 设计思路:
docs/cn/controller/design.md
- 自动主从切换快速开始:
docs/cn/controller/quick_start.md
- 部署和升级:
docs/cn/controller/deploy.md
5.10 负载均衡
Rocket负载均衡的方案有比较多的方式,但是常用的就两种,这里举一个参考案例进行介绍。
以集群模式的部署为例,消费者是如何分配消息的?
假设某个Topic有16个队列,此时有3个消费者的消费组来负责分配这些队列,把队列按照 0 - 15 进行排列,组成 q0 - q15,消费者则用 c0 - c2 表示。
首先需要明确一点:同一个消费者同一时间可以被安排到多个队列,但是同一时间只能是一个消费者消费某一条消息。
通俗理解就是进地铁站的时候闸机有很多个,但是最终只能进到一个闸机中刷卡进站或者出站。
RocketMq有很多队列分配算法,最终常用的下面两个:
- AllocateMessageQueueAveragely:平均分配
- AllocateMessageQueueAveragelyByCircle:轮流平均分配
如果是 AllocateMessageQueueAveragely 算法,则q0-q15按照下面的方式划分:
- c0:q1 - q5、q16
- c1:q7 - q10
- c2:q11 - q15
这种分配也就是简单的除法,如果碰到余数,则接着桉顺序进行分配,比如这里的q16分到了c0。
根据轮流分配 算法AllocateMessageQueueAveragelyByCircle 进行平均分类则更像是把斗地主轮流摸牌一样,也就是c0=>q1,c1=>q2,c2=>q3,c0=>q4,c1=>q5这样的方式进行处理。
六、小结
Rocket基本扫盲篇,主要介绍了下面几点内容:
- 什么是消息队列,消息队列的定义以及优劣对比。
- 什么是RocketMQ,RocketMq的基础概念,以及基础术语介绍。
- RocketMQ的一些重要设计理念,以及部分重要升级内容。
- RcoketMq消息模型,并发和顺序消费,以及消费进度的解释。
- 一些FAQ和常见的消息队列问题。
以上为个人对于RocketMq的扫盲篇整理,文章无法做到面面俱到,更多内容可以参考官方文档了解更多细节。
N、FAQ
1. 如何理解Rocketmq中Topic、Queue以及偏移量呢?
Topic 是一个逻辑集合,具有某种业务上共性的性质的消息会发到指定的Topic中,需要取对应性质的消息也会到指定Topic中取,如被测项目中的Running top、Finish top等。
要理解Queue,首先要知道负载均衡这个概念,如果集群中有两个consumer,那我queue的应该是2的倍数,这样可以保持负载均衡。
要理解偏移量就需要知道mq顺序写随机读的一个概念,mq的消息实际上是从pagecache持久化到磁盘文件:commitlog中,顺序写入,因此consumer中读取的时候需要知道从哪里读,也就是通过偏移量来标识。
2. 生产者、消费者与Topic主题之间的关系?
生产者、消费者与Topic主题之间的关系是,一个Topic可以由多个生产者发送消息,反过来一个生产者也可以发送多个Topic消息。一个Topic可以由多个消费者消费,消费者可以消费多个Topic消息。
3. 为什么选择RocketMQ作为你们项目中的消息中间件?
可以从下面几点进行回答:
- RocketMQ集群无单点,可扩展,任意一点高可用,水平可扩展;
- 支持海量消息堆积能力,消息堆积后,写入低延迟;
- 支持上万个队列(与ActiveMQ进行对比);
- 支持消息失败重试机制;
- 消息可查询;
- 开源社区活跃;
- 成熟度(经过双十一考验);
4. 为什么要有NameServer?
纵观整个RocketMq的组件,你会发现好像NameServer也没啥用呀,不就一个路由管理和Broker心跳检测功能么,为什么要多出一层?
实际上NameServer是非常巧妙的解耦设计,如果所有的Broker和生产者以及消费者连接,那么本身的职责就会变得十分复杂,也不好维护和负载均衡。
NameServer就像是微服务中的注册中心,如果其他组件要找到对方,就需要“指路人”的帮助,RocketMq利用NameServer把这一部分职责抽取出来是非常明智的。
其次NameServer本身就是为了取代ZK存在的,有了NameServer不需要其他中间件的干扰,减少了运维的复杂性,并且有更高的性能和灵活性。
参考文章
RocketMQ 介绍及基本概念_fFee-ops的博客-CSDN博客_rocketmq