消息队列 | Java后端
MQ
选型
- ActiveMQ
- Java
- 社区活跃度很低
- RabbitMQ
- ErLang,实现定制化开发难度较大
- 并发性能好,延时达到微秒级(因为 ErLang)
- 对消息堆积的支持并不好,当大量消息积压时性能急剧下降
- Kafka
- Scala/Java
- 最大特点是仅提供较少的核心功能,但是提供超高的吞吐量、极高的可用性以及可靠性高,常用于大数据领域的实时计算、日志采集等场景
- 每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高,所以 Kafka 不太适合在线业务场景
- RocketMQ
- Java
- 有阿里实际业务场景的实战考验
- 对在线业务的响应时延做了很多的优化
为什么要使用 MQ
通过异步处理提高系统性能,减少响应所需时间
上游系统对下游系统的调用若为同步调用,会大大限制系统的吞吐量与并发度,所以在中间添加一个 MQ,实现同步转异步
限流削峰
先将短时间高并发产生的事务消息存储在消息队列中,然后下游服务只需根据自己的能力去消费这些消息,这样就避免直接把下游服务打垮掉
降低系统耦合性
- 服务消费者对服务提供者直接调用,属于强耦合,如果服务提供者不可用,服务消费者也就无法正常运行。引入 MQ 之后,服务只需 MQ 交互,无需关心其他服务是否可用
- 新增业务若对某类消息感兴趣,订阅该消息即可,对原有系统和业务没有任何影响,从而实现可扩展性设计
引入 MQ 产生的问题
系统可用性降低
原本只需要保证服务可用,引入 MQ 后还需要保证 MQ 可用
系统复杂性提高
引入 MQ 之后,需要保证消息没有被重复消费、处理消息丢失、保证消息传递的顺序性等等问题
一致性问题
当服务间是同步调用时还可以使用本地事务来控制数据的一致性。但引入 MQ 之后,服务间调用变成异步了,就没法使用本地事务,也就无法做到数据的强一致性了
消息模型
队列模型
每条消息只能被一个消费者消费
发布/订阅模型
一条消息可以被多次消费
RocketMQ
架构
角色模型
四个部分组成:NameServer、Broker、Producer、Consumer,每一部分都是集群部署
Producer
消息生产者,负责发消息到 Broker
每个消息生产者都有所属的生产者组 Producer Group。生产者组是同一类生产者的集合,一个生产者组可以同时发送多个 Topic 的消息,不过习惯上都是将发送相同 Topic 的消息的生产者归为一组
Consumer
消息消费者,负责从 Broker 上拉取消息进行消费
每个消息生产者都有所属的消费者组 Consumer Group。一个消费者组中的消费者必须订阅完全相同的 Topic
支持 PULL 和 PUSH 两种消费方式:
- 拉取型消费者主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型
- 推送型消费者封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息
Broker
消息队列服务器本身,主要负责消息的存储、投递和查询
NameServer
注册中心,主要包括两个功能:
- Broker 管理:接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据
- 路由信息管理:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Conumser 可以从 NameServer 获取整个 Broker 集群的路由信息,从而进行消息的投递和消费
NameServer 是一个几乎无状态的节点,可集群部署。NameServer 集群实际上属于一个伪集群,节点互不交互,集群只起到备份的作用
NameServer 与 Broker、Broker 与 Producer、Broker 与 Producer 之间的连接都是长连接,存在心跳检测机制
队列模型
- Topic:可以看做消息的归类,一条消息必须有一个 Topic
- Offset:在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置
消息存储在队列中。一个 Topic 可以包含多个队列,每个队列中存放的就是该 Topic 的消息
消费模式
集群消费(默认)
一个消费者组共同消费一个 Topic 的多个队列,一个队列只会被一个消费者消费
广播消费
消息会发给消费者组中的每一个消费者进行消费
工作流程
- Broker 在启动的时候去向所有的 NameServer 注册,并保持长连接,每 30s 发送一次心跳
- Producer 在发送消息的时候从 NameServer 获取 Broker 服务器地址,根据负载均衡算法选择一台服务器来发送消息
- Conusmer 消费消息的时候同样从 NameServer 获取 Broker 地址,然后拉取消息来消费
刷盘
保证断电后恢复,又可让存储的消息量超出内存的限制
同步刷盘
消息到达 Broker 内存之后,必须等刷盘完成再向 Producer 返回写成功状态
异步刷盘
消息到达 Broker 内存之后就向 Producer 返回写成功状态,当内存里的消息量积累到一定程度时,再统一触发写磁盘操作
同步刷盘能保证消息不丢失,但响应时间比异步刷盘长,适用于对消息可靠性要求比较高的场景
异步刷盘响应响应时间短,但有可能丢失消息,适用于对吞吐量要求比较高的场景复制策略
复制策略是 Broker 的 Master 与 Slave 间的数据同步方式,有同步和异步两种复制方式
同步复制是等 Master 和 Slave 均写成功后才反馈给客户端写成功状态,又称同步双写;异步复制方式是只要 Master 写成功即可反馈给客户端写成功状态
Broker 集群方式
多 Master
broker 集群仅由多个 master 构成,不存在 slave,同一 Topic 的各个 Queue 平均分布在各个 Master 节点上
优点:配置简单
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅(不可消费),消息实时性会受到影响
多 Master 多 Slave(异步复制)
broker 集群由多个 master 构成,每个 master 又配置了多个 slave
master 与 slave 的关系是主备关系,master 负责处理消息的读写请求,而 slave 仅负责消息的备份与 master 宕机后的角色切换,实现服务和数据的高可用性
当 master 宕机后 slave 能够自动切换为 master,但存在少量消息的丢失问题
多 Master 多 Slave(同步复制)
消息的安全性更高,不存在消息丢失的情况
但发送单个消息的响应时间相比异步模式要长,导致整体性能不如异步模式,并且当前版本 master 宕机后,slave 不会自动切换到 master
负载均衡
RocketMQ 中的负载均衡都在 Client 端完成,也就是分为 Producer 端发送消息时的负载均衡和 Consumer 端订阅消息时的负载均衡
TODO
消息长轮询
指 Consumer 拉取消息,如果对应的队列如果没有消息,Broker 不会立即返回,而是把 PullReuqest 挂起,等待队列来消息后,或者长轮询阻塞时间到了,再重新处理该队列上的所有 PullRequest
如何保证消息不丢失
Producer 发送消息阶段
同步发送 + 失败/超时重试机制
Broker 处理消息阶段
同步刷盘 + 同步复制
Consumer 消费消息阶段
消费完再确认
如何处理消息重复
主要的方式有两种:业务幂等和消息去重
业务幂等
保证消费逻辑的幂等性,不管消息消费多少次,对业务都没有影响
消息去重
对重复的消息就不再消费
具体做法是可以建立一个消息消费记录表,需要保证每条消息都有一个惟一的编号,在表中对这个编号做唯一约束,当消息拿到时先落库,如果有冲突就不处理
如何处理消息积压
如果当前 Topic 中的队列数量大于消费者数量
可以选择消费者扩容,提高消费能力
如果当前 Topic 中的队列数量小于等于消费者数量
新建一个临时 Topic,里面多设置一些队列,把消息迁移过去,再用扩容的消费者去消费新的 Topic 里的数据
如何实现顺序消息
顺序消息是指消息的消费顺序和产生顺序相同
顺序消息又分为:
部分顺序消息:只要保证每一组消息被顺序消费即可
全局顺序消息:某个 Topic 下的所有消息都要保证顺序
部分顺序消息的实现
生产者发送一组消息时都发送到一个队列中,只要消费者不并发读取,这一组消息就能成为顺序消息
全局顺序消息的实现
牺牲 RocketMQ 的高并发、高吞吐的特性,把 Topic 的队列数设置为 1,并把消费者设置为单线程处理
如何实现消息过滤
有两种方案:
在 Broker 端过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂
在 Consumer 端过滤,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端,只能丢弃不处理
一般采用 Cosumer 端过滤。如果希望提高吞吐量,可以采用 Broker 过滤
对消息的过滤有三种方式
- 根据 Tag 过滤(高效简单)
- SQL 表达式过滤(更灵活)
- 使用 Filter Server
怎么实现延时消息的
临时存储 + 定时任务
Broker 收到延时消息后会先发送到主题 SCHEDULE_TOPIC_XXXX 的相应时间段的队列中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息
死信队列
当一条消息初次消费失败,Broker 会自动进行重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时 Broker 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列
死信消息的特点:不会再被消费者正常消费;有效期与正常消息相同,均为 3 天,3 天后会被自动删除
RocketMQ 控制台提供对死信消息的查询、导出和重发的功能
消费者的 Push 和 Pull 模式
RocketMQ 提供了两种消息消费模式:Push(推模式)和 Pull(拉模式),它们在实现机制和使用场景上有所不同。
Push 模式(推模式)
特点:
- 服务端主导:Broker 主动将消息推送给消费者
- 实时性高:消息到达后立即推送给消费者
- 实现简单:消费者只需注册监听器处理消息
实现原理:
- 底层实际是基于 Pull 模式的封装
- 消费者启动后,会有一个后台线程不断从 Broker 拉取消息
- 拉取到消息后立即触发回调函数
优点:
- 使用简单,开发者只需关注业务逻辑
- 消息延迟低,实时性好
- 自动维护偏移量(offset)
缺点:
- 消费速率由服务端控制,可能不适应消费者处理能力
- 突发流量可能导致消费者积压
Pull 模式(拉模式)
特点:
- 客户端主导:消费者主动从 Broker 拉取消息
- 灵活控制:可以精确控制拉取的时机和数量
- 实现复杂:需要自行处理偏移量等细节
实现原理:
- 消费者主动调用 pull 方法获取消息
- 可以控制每次拉取的消息数量和等待时间
- 需要自行管理消费进度(offset)
优点:
- 完全由消费者控制消费节奏
- 适合特殊场景,如批量处理、定时拉取等
- 可以更精细地控制消费行为
缺点:
- 实现复杂度高
- 实时性不如 Push 模式
- 需要自行处理很多细节
实际应用建议
- 大多数场景推荐使用 Push 模式:简单高效,能满足大部分业务需求
- 特殊场景考虑 Pull 模式:如需要精确控制消费速率、批量处理、定时消费等
- 混合使用:可以在同一个应用中针对不同队列使用不同模式
RocketMQ 的 Push 模式虽然叫 “推”,但实际上是基于 Pull 模式的封装,这种设计既保证了易用性,又保留了灵活性。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ZERO!