消息中间件
消息队列对比&技术选型
| 对比指标 | RabbitMQ | RocktMQ | Kafka |
|---|---|---|---|
| 性能和吞吐量 | 中小规模的数据传输,吞吐量较低 | 吞吐量高,适合大规模分布式系统 | 高性能和高吞吐量 |
| 消息传递的可靠性 | 数据不丢失和消息不重复投递 | 数据不丢失和消息不重复投递 | |
| 消息传递模型 | 支持发布-订阅、点对点模型 | 支持发布-订阅、点对点模型 | 支持发布-订阅 |
| 消息持久化 | 传统的消息持久化方式 | 传统的消息持久化方式 | 更快地持久化消息,支持高效的消息査询 |
| 优先级队列 | 支持 | 支持 | 不支持 |
| 延迟队列 | 支持延迟队列,可以通过插件或者消息TTL和死信交换来实现 | 直接支持延迟队列,可设定消息的延迟时间 | 不支持 |
| 死信队列 | 支持 | 支持 | 不支持 |
| 重试队列 | 可以实现重试机制,但需要通过消息属性和额外配置来手动设置 | 支持重试队列,可以自 动或手动将消息重新发送 | 不确定 (面试时有的面试官说支持,有的面试官说不支持) |
| 消费模式 | 采用推模式 | 支持推和拉两种模式 | 采用拉模式 |
| 事务消息 | 支持 | 支持 | 支持 |
- Kafka:适合高吞吐量、大数据量场景。
- RabbitMQ:适合复杂路由和高可靠性需求。
- RocketMQ:适合需要高可用性和强一致性(事务)的企业应用。
- Redis:适合对性能要求高,但对数据持久性不敏感的场景。
RabbitMQ
RabbitMQ如何保证消息不丢失?
我们使用RabbitMQ来确保MySQL和Redis间数据双写的一致性,这要求我们实现消息的高可用性,具体措施包括:
- 开启生产者确认机制,确保消息能被送达队列,如有错误则记录日志并修复数据。
- 启用持久化功能,保证消息在未消费前不会在队列中丢失,需要对交换机、队列和消息本身都进行持久化。
- 对消费者开启自动确认机制,并设置重试次数。例如,我们设置了3次重试,若失败则将消息发送至异常交换机,由人工处理。
RabbitMQ消息的重复消费问题如何解决?
我们遇到过消息重复消费的问题,处理方法是:
- 设置消费者为自动确认模式,如果服务在确认前宕机,重启后可能会再次消费同一消息。
- 通过业务唯一标识检查数据库中数据是否存在,若不存在则处理消息,若存在则忽略,避免重复消费。
那你还知道其他的解决方案吗?
是的,这属于幂等性问题,可以通过以下方法解决:
- 使用Redis分布式锁或数据库锁来确保操作的幂等性。
RabbitMQ中死信交换机了解吗?(RabbitMQ延迟队列有了解吗?)
了解。我们项目中使用RabbitMQ实现延迟队列,主要通过死信交换机和TTL(消息存活时间)来实现。
- 消息若超时未消费则变为死信,队列可绑定死信交换机,实现延迟功能。
- 另一种方法是安装RabbitMQ的死信插件,简化配置,在声明交换机时指定为死信交换机,并设置消息超时时间。
如果有100万消息堆积在MQ,如何解决?
若出现消息堆积,可采取以下措施:
- 提高消费者消费能力,如使用多线程。
- 增加消费者数量,采用工作队列模式,让多个消费者并行消费同一队列。
- 扩大队列容量,使用RabbitMQ的惰性队列,支持数百万条消息存储,直接存盘而非内存。
RabbitMQ的高可用机制了解吗?
我们项目在生产环境使用RabbitMQ集群,采用镜像队列模式,一主多从结构。
- 主节点处理所有操作并同步给从节点,若主节点宕机,从节点可接替为主节点,但需注意数据同步的完整性。
那出现丢数据怎么解决呢?
使用仲裁队列,主从模式,基于Raft协议实现强一致性数据同步,简化配置,提高数据安全性。
RocketMQ
RocketMQ如何实现的延时队列
延时消息的存储
RocketMO 的延时消息并不是直接存储在普通的消息队列中,而是通过将消息存储在特殊的延时队列中来实现的。具体过程如下:
- 当生产者发送一条延时消息时,会指定一个延时时间(例如 10 分钟后投递)。
- RocketMQ 会将这条消息标记为延时消息,并将其存储到内部的延时队列中,而不是直接投递到目标 Topic的队列中。
延时队列的分级存储
RocketMQ 的延时消息支持 18 个固定的延时级别(如 1s、5s、10s、30s、1h、2h等)。每个延时级别对应一个独立的延时队列。例如:
- 延时 1s 的消息会被存储到延时级别1的队列中。
- 延时 5s 的消息会被存储到延时级别 2的队列中,这种分级存储的设计可以提高延时消息的处理效率
延时消息的投递
RocketMO 会定期检查延时队列中的消息是否已经到达投递时间。
- RocketMO 的 Broker 会启动一个定时任务,扫描延时队列中的消息。
- 当某个延时消息的投递时间到达时,RocketMQ 会将其从延时队列中移动到目标 Topic 的普通队列中。
- 消费者从普通队列中消费这条消息,从而实现延时投递的效果。
Kafka

| ack取值 | 行为 |
|---|---|
**0** |
不等待确认,直接发送下一条消息。 |
**1** |
等待 Leader 副本确认。 |
**all/-1** |
等待 ISR 中所有副本确认 |
Zookeeper 在 Kafka 中的作用
管理集群元数据:
- Zookeeper 负责存储 Kafka 集群的元数据,如分区的 Leader 信息和消费者组的位移(offset)等。
选举 Kafka 的 Leader:
- Kafka 中的每个分区都有一个 Leader,Zookeeper 负责选举和管理 Leader 副本。当 Leader 故障时,Zookeeper 会选举一个新的 Leader,确保集群的高可用性。
集群协调与监控:
- Zookeeper 协调 Kafka 各节点的状态,监控 Broker 的存活状态,保证整个集群的一致性和高可用性。
处理分区和副本的分配:
- Zookeeper 负责管理和协调 Kafka 分区、副本的分配,确保分区和副本的平衡与健康。
说一下 Rebalance 机制
- Rebalance 机制是 Kafka 中 消费者组 管理的一部分,目的是在消费者组的成员发生变化时,自动调整各消费者分配的分区,确保消息的均衡消费。
Rebalance 发生的情况:消费者加入或离开消费者组,消费者挂掉或恢复,Kafka 会触发 Rebalance,即重新平衡分区的分配。
消息队列的作用
消息队列的作用主要是 解耦 和 异步处理:
- 解耦:使用消息队列作为中间件,不同的模块可以将消息发送到消息队列中,不需要知道具体的接收方是谁,接收方可以独立地消费消息,实现了模块之间的解耦。
- 异步处理: 将耗时操作(如发邮件、生成报表)放入队列异步处理,避免阻塞主流程,提升系统性能和响应速度。
- 流量削峰:在高并发时作为缓冲区,平滑突发流量。将请求暂存队列,系统按能力消费,避免瞬时压力,提高稳定性和资源利用率。
Kafka是如何保证消息不丢失?
Kafka 主要从三个环节保障消息不丢失:
生产者:确保消息发出去
- 异步发送和重试机制:生产者通过异步方式发送消息,避免阻塞,并且支持自动重试机制。如果发送失败,可以通过回调获取失败后的消息,Kafka 会自动重试,直到成功或者达到最大重试次数(retries=3)
- 消息确认机制:生产者通过设置
acks参数,控制消息的确认方式。当 acks=all 时,Kafka 会等待所有副本确认收到消息后,才认为消息发送成功,确保消息不丢失。Broker:确保消息存得住
- 消息副本:Kafka 会在每个分区中保存多个副本,副本分布在不同的 Broker 上,确保即使某个 Broker 出现故障,其他副本仍能保证消息的可用性。
- ISR机制:Kafka 只会在所有同步副本(ISR)确认收到消息后,才认为消息写入成功。如果 Leader 出现故障,新的 Leader 会从 ISR 中选举出来,避免丢失未同步的数据。
消费者:确保消息被处理
- 自动提交与手动提交:Kafka 允许消费者选择自动提交或者手动提交消费位点。手动提交可以确保消费者只在成功处理消息后才确认位点,避免消息丢失。
enable.auto.commit=falseKafka本身
- Kafka 会先将消息写入内存缓存,然后定期刷新到磁盘,确保消息持久存储,即使系统崩溃数据也能恢复
Kafka中消息的重复消费问题如何解决?
出现原因:
Kafka为了保证消息不丢失,默认采用‘至少一次’的交付语义。
这会导致一种情况:
- 消费者成功处理了消息,但在提交消费偏移量(Offset)之前,发生了故障,比如程序崩溃或网络中断。
- 当消费者恢复后,它会从上一次成功提交的Offset位置重新拉取消息,这就使得那些已经处理过但未提交Offset的消息被再次消费,从而产生重复。
通过以下方法解决Kafka中的重复消费问题:
1. 消费端做幂等:
- 这是最常用也最推荐的方法。在消费者端设计 幂等消费,即使相同的消息被处理多次,也不会对业务产生副作用。
- 比如用数据库的唯一键防止重复插入,或者用 Redis 记录处理过的消息 ID。这样即使消息重复了,处理结果也正确。
2. 手动提交偏移量:
- 通过 手动提交偏移量,确保只有在消息处理完成后才提交位移,避免由于消费者重启或崩溃而导致重复消费。
3. 启用 Kafka 事务:
- 把消费者处理消息的业务操作(比如写数据库或发新消息)和提交偏移量这个动作,放在同一个事务里。这样就保证了处理成功才会提交偏移量,避免重复。
Kafka是如何保证消费的顺序性?
Kafka 保证消费顺序性的方法主要是通过 分区机制,每个分区内的消息是按顺序消费的。
如果需要严格的顺序性,确保相关消息发送到同一个分区,并且由一个消费者消费该分区的所有消息。
生产者分区策略
发送消息时指定 key(如订单 ID),Kafka 通过哈希算法将相同 key 的消息路由到同一分区。
分区与消费者的一对一绑定
每个分区只能被消费者组中的一个消费者处理,避免多消费者导致乱序。
如何解决消息积压问题
1)消息积压问题的出现
- 如果消费者处理消息的速度比生产者产生消息的速度慢,就会导致消息在 Kafka 中堆积,从而形成积压。
- 如果消费者数量不足,无法分担各个分区的消费任务,也会导致积压问题。
- 如果 Kafka 的 分区数 设置得过少,或者 磁盘容量 不足,也会导致消息积压。
对于一个消费者组,如果消费者数量多于分区数,部分消费者会处于空闲状态,导致消息的消费效率低下,进而产生积压。
Kafka 将消息持久化到磁盘,如果 磁盘容量不足,Kafka 就无法继续存储新消息。当磁盘空间满时,Kafka 会暂停写入新的消息,这会导致生产者无法将消息写入集群,同时消费者也不能继续消费,这样就会造成消息积压。- 网络延迟或Kafka 系统故障可能导致消费者无法及时从 Kafka 拉取消息,从而产生积压
2)消息积压的解决方案
- 优化消费者的处理逻辑,减少每条消息的处理时间,提高消费效率。
- 通过增加 消费者 数量,尤其是 消费者组中的消费者,可以并行消费更多的分区,从而加速消息处理,避免积压。
- 通过 增加分区数,可以将生产者的消息均匀分布到多个分区上,消费者组可以并行消费更多的分区,从而提高消费速率。
- 通过 增加 Kafka broker,扩展集群容量,提升整体吞吐量,减少消息积压。
Kafka的高可用机制了解吗?
- Kafka 通过 副本机制、ISR 机制、自动故障转移 和 数据持久化 确保高可用性。即使 Leader 或 Broker 故障,Kafka 能够自动恢复并保证消息不丢失,保证系统的高可用性
副本机制:
- Kafka 通过 副本 来确保高可用性。每个 Kafka 分区有多个副本,分布在不同的 Broker 上。
- 其中,Leader 副本负责处理所有的读写请求,而 Follower 副本同步 Leader 数据。只有所有副本都成功确认,消息才算写入成功。
ISR(In-Sync Replicas)机制:
- ISR 是指与 Leader 保持同步的副本列表,只有 ISR 中的副本才会被认为是有效副本。
- 如果某个副本落后于 Leader 或不可用,它会被移出 ISR。Kafka 会保证在 Leader 故障时,从 ISR 中选举新的 Leader,确保数据不会丢失。
自动故障转移:
- 如果 Leader 副本出现故障,Kafka 会通过 Zookeeper 或 Kafka 内部的控制器机制自动从 ISR 中选举新的 Leader,确保消息的持续传输。
数据持久化:
- Kafka 将消息持久化到磁盘,即使发生故障,数据也不会丢失。通过持久化和副本机制,Kafka 能够保证数据的高可用性。
分布式架构:
- Kafka 的分布式架构使得它能够水平扩展,增加更多的 Broker 来分担负载并增强系统的可用性。
- 这样即使有部分 Broker 出现故障,系统仍能保持正常运行。
解释一下复制机制中的ISR?
在 Kafka 中,ISR(In-Sync Replicas)是指与 Leader 副本保持同步的副本集合。如果某个副本滞后或故障,它会被移出 ISR,直到同步完成。
副本同步:
- Kafka 每个分区有多个副本,其中一个副本是 Leader,其他是 Follower。
- 消息首先写入 Leader 副本,然后 Leader 会将消息同步到 Follower 副本。如果 Follower 副本与 Leader 副本的数据一致(即同步),它会被加入到 ISR中。
数据一致性:
- 只有那些与 Leader 保持同步的副本才会被认为是有效副本。假如某个 Follower 副本与 Leader 的数据差距过大,它就会被移出 ISR,直到它重新与 Leader 同步为止。
故障恢复:
- 如果 Leader 副本出现故障,Kafka 会从 ISR 中选举一个新的 Leader,确保没有数据丢失,保证高可用性。
Kafka数据清理机制了解吗?
基于时间的清理(Time-based Cleanup):
- Kafka 可以通过配置来设置消息的保留时间。超过指定时间的消息会被自动删除。
- 这种方式适用于某些场景,如保留最近 7 天的消息,过期的消息将会被清理掉。
基于大小的清理(Size-based Cleanup):
- Kafka 也可以通过配置来设置日志的最大大小。如果某个分区的日志文件大小超过配置的限制,Kafka 会删除最旧的数据以腾出空间。
- 这种方式适用于空间有限的情况,确保 Kafka 不会因为日志过大而导致磁盘占用过多。
Kafka中实现高性能的设计有了解过吗?
Kafka高性能设计包括:
- 顺序写入磁盘:Kafka 将消息写入磁盘时采用顺序写入的方式,这比随机写入要高效得多。顺序写入减少了磁盘寻址的时间,提高了磁盘的吞吐量。
- 批量处理:Kafka 支持 批量发送 和 批量处理,生产者可以将多个消息打包成一个批量发送到服务器。批量处理减少了网络延迟和每条消息的处理开销,从而提高了吞吐量。
- 内存缓存:Kafka 将消息写入到内存中的 PageCache,然后再异步刷新到磁盘。这种方式保证了磁盘的高效写入,同时减少了 I/O 操作的等待时间。
- 分布式架构:Kafka 通过 分区 来并行处理数据,每个分区可以独立地进行读写操作,多个消费者可以并行消费不同的分区,这极大提高了吞吐量和扩展性。通过增加 分区数 和 消费者数量,Kafka 能够在分布式系统中平衡负载,支持高吞吐量。
- 零拷贝技术:Kafka 利用 零拷贝技术 来提高磁盘 I/O 性能。在数据传输过程中,数据不需要通过应用层内存进行拷贝,直接从磁盘通过操作系统缓冲区发送到网络,从而减少了 CPU 的负载和数据传输的延迟。