RocketMQ如何保证消息的顺序性?

典型回答 和Kafka只支持同一个Partition内消息的顺序性一样,RocketMQ中也提供了基于队列(分区)的顺序消费。即同一个队列内的消息可以做到有序,但是不同队列内的消息是无序的! 当我们作为MQ的生产者需要发送顺序消息时,需要在send方法中,传入一个MessageQueueSelector。 MessageQueueSelector中需要实现一个select方法,这个方法就是用来定义要把消息发送到哪个MessageQueue的,通常可以使用取模法进行路由: 1 2 3 4 5 6 7 8 9 10 11 12 13 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override //mqs:该Topic下所有可选的MessageQueue //msg:待发送的消息 //arg:发送消息时传递的参数 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; //根据参数,计算出一个要接收消息的MessageQueue的下标 int index = id % mqs.size(); //返回这个MessageQueue return mqs.get(index); } }, orderId); 通过以上形式就可以将需要有序的消息发送到同一个队列中。需要注意的时候,这里需要使用同步发送的方式! ...

March 22, 2026 · 1 min · santu

RocketMQ怎么实现消息分发的?

典型回答 RocketMQ 支持两种消息模式:广播消费( Broadcasting )和集群消费( Clustering )。 广播消费:当使用广播消费模式时,RocketMQ 会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。 广播模式下,RocketMQ 保证消息至少被客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。并且,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过。 集群消费(默认):当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。 集群模式下,每一条消息都只会被分发到一台机器上处理。但是不保证每一次失败重投的消息路由到同一台机器上。一般来说,用集群消费的更多一些。 通过设置MessageModel可以调整消费方式: 1 2 3 4 5 6 // MessageModel设置为CLUSTERING(不设置的情况下,默认为集群订阅方式)。 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // MessageModel设置为BROADCASTING。 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

March 22, 2026 · 1 min · santu

RocketMQ的事务消息是如何实现的?

典型回答 ✅什么是事务消息,为什么需要事务消息? RocketMQ的事务消息是通过TransactionListener接口来实现的。 在发送事务消息时,首先向RocketMQ Broker发送一条“half消息”(即半消息),半消息将被存储在Broker端的事务消息日志中,但是这个消息还不能被消费者消费。 接下来,在半消息发送成功后,应用程序通过执行本地事务来确定是否要提交该事务消息。 如果本地事务执行成功,就会通知RocketMQ Broker提交该事务消息,使得该消息可以被消费者消费;否则,就会通知RocketMQ Broker回滚该事务消息,该消息将被删除,从而保证消息不会被消费者消费。 拆解下来的话,主要有以下4个步骤: 发送半消息:应用程序向RocketMQ Broker发送一条半消息,该消息在Broker端的事务消息日志中被标记为“prepared”状态。 执行本地事务:RocketMQ会通知应用程序执行本地事务。如果本地事务执行成功,应用程序通知RocketMQ Broker提交该事务消息。 提交事务消息:RocketMQ收到提交消息以后,会将该消息的状态从“prepared”改为“committed”,并使该消息可以被消费者消费。 回滚事务消息:如果本地事务执行失败,应用程序通知RocketMQ Broker回滚该事务消息,RocketMQ将该消息的状态从“prepared”改为“rollback”,并将该消息从事务消息日志中删除,从而保证该消息不会被消费者消费。 扩展知识 如果一直没收到COMMIT或者ROLLBACK怎么办? 在RocketMQ的事务消息中,如果半消息发送成功后,RocketMQ Broker在规定时间内没有收到COMMIT或者ROLLBACK消息。 RocketMQ会向应用程序发送一条检查请求,应用程序可以通过回调方法返回是否要提交或回滚该事务消息。如果应用程序在规定时间内未能返回响应,RocketMQ会将该消息标记为“UNKNOW”状态。 在标记为“UNKNOW”状态的事务消息中,如果应用程序有了明确的结果,还可以向MQ发送COMMIT或者ROLLBACK。 但是MQ不会一直等下去,如果过期时间已到,RocketMQ会自动回滚该事务消息,将其从事务消息日志中删除。 第一次发送半消息失败了怎么办? 在事务消息的一致性方案中,我们是先发半消息,再做业务操作的 所以,如果半消息发失败了,那么业务操作也不会进行,不会有不一致的问题。 遇到这种情况重试就行了。(可以自己重试,也可以依赖上游重试) 为什么要用事务消息? 很多人看完事务消息会有一个疑惑:本地事务执行完成之后再发送消息有什么区别?为什么要有事务消息呢? 主要是因为:本地事务执行完成之后再发送消息可能会发消息失败。 一旦发送消息失败了,那么本地事务提交了,但是消息没成功,那么监听者就收不到消息,那么就产生数据不一致了。 那如果用事务消息。先提交一个半消息,然后执行本地事务,再发送一个commit的半消息。如果后面这个commit半消息失败了,MQ是可以基于第一个半消息不断反查来推进状态的。这样只要本地事务提交成功,最终MQ也会成功。如果本地事务rolllback,那么MQ的消息也会rollback。保证了一致性。 MQ实现分布式事务 ✅如何基于MQ实现分布式事务

March 22, 2026 · 1 min · santu

RocketMQ的架构是怎么样的?

典型回答 RocketMQ主要由Producer、Broker和Consumer三部分组成,如下图所示: Producer:消息生产者,负责将消息发送到Broker。 Broker:消息中转服务器,负责存储和转发消息。RocketMQ支持多个Broker构成集群,每个Broker都拥有独立的存储空间和消息队列。 Consumer:消息消费者,负责从Broker消费消息。 NameServer:名称服务,负责维护Broker的元数据信息,包括Broker地址、Topic和Queue等信息。Producer和Consumer在启动时需要连接到NameServer获取Broker的地址信息。 Topic:消息主题,是消息的逻辑分类单位。Producer将消息发送到特定的Topic中,Consumer从指定的Topic中消费消息。 Message Queue:消息队列,是Topic的物理实现。一个Topic可以有多个Queue,每个Queue都是独立的存储单元。Producer发送的消息会被存储到对应的Queue中,Consumer从指定的Queue中消费消息。

March 22, 2026 · 1 min · santu

介绍一下RocketMQ的工作流程?

典型回答 RocketMQ中有这样几个角色:NameServer、Broker、Producer和Consumer NameServer:NameServer是RocketMQ的路由和寻址中心,它维护了Broker和Topic的路由信息,提供了Producer和Consumer与正确的Broker建立连接的能力。NameServer还负责监控Broker的状态,并提供自动发现和故障恢复的功能。 Broker:Broker是RocketMQ的核心组件,负责存储、传输和路由消息。它接收Producer发送的消息,并将其存储在内部存储中。并且还负责处理Consumer的订阅请求,将消息推送给订阅了相应Topic的Consumer。 Producer(消息生产者):Producer是消息的生产者,用于将消息发送到RocketMQ系统。 Consumer(消息消费者):Consumer是消息的消费者,用于从RocketMQ系统中订阅和消费消息。 RocketMQ的工作过程大致如下: 1、启动NameServer,他会等待Broker、Producer以及Consumer的链接。 2、启动Broker,会和NameServer建立连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系。 3、启动Producer,启动时先随机和NameServer集群中的一台建立长连接,并从NameServer中获取当前发送的Topic所在的所有Broker的地址;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送。 4、Broker接收Producer发送的消息,当配置为同步复制时,master需要先将消息复制到slave节点,然后再返回“写成功状态”响应给生产者;当配置为同步刷盘时,则还需要将消息写入磁盘中,再返回“写成功状态”;要是配置的是异步刷盘和异步复制,则消息只要发送到master节点,就直接返回“写成功”状态。 5、启动Consumer,过程和Producer类似,先随机和一台NameServer建立连接,获取订阅信息,然后在和需要订阅的Broker建立连接,获取消息。

March 22, 2026 · 1 min · santu

介绍下 RocketMQ 5.0中的 pop 模式

典型回答 在5.0之前,RocketMQ 的消息投递方式有推和拉两种 ✅RocketMQ的消息是推还是拉? RocketMQ 5.0 引入的一种新的消费模式——Pop 模式,它结合了 Pull 和 Push 模式的优点,提供了一种高效、低延迟的消息消费方式。 Pop 模式其实也是一种拉的模式,主要是来代替原来的 push 模式的。 在5.0以前的 push 模式中(其实也是基于拉实现的)。客户端在开始消费消息前,会需要通过负载均衡算法计算出自己需要消费哪些 Queue,每当 Consumer 数量发生变化时就会触发ReBalance。 这个负载均衡是在客户端做的,也就是消费者这里,这也就意味着如果负载均衡时间过长会影响消费者的消费。 但是,这种模式有个问题,那就是无法通过一直增加客户端数量的方式来提升消费能力。因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,再扩容的话,也会因为无法分配到 Queue而无法消费。这也就是传统的 push 模式的性能瓶颈。 除了负载均衡以外,push 模式中,消费者除了要做负载均衡以外,还有很多其他的事情要做,比如消息拉取,消息消费位点管理等等。这使得客户端的职责很大,出错的概率也比较大。 还有一个问题,那就是如果某个消费者hang住,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 于是在5.0中推出了一个新的 POP模式,来解决这些问题。 在 POP 模式中,消费者不需要感知到分区,即MessageQueue 和消费者不再进行绑定了,并且POP的消费位点也由Broker保存和控制。消费者直接通过 POP 模型提供的接口去获取到数据,消费成功后 ACK 数据。 这样做好处就是消费者只需要负责POP 消息,不再需要进行负载均衡以及消息的进度管理。并且即便某个消费者hang住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积。

March 22, 2026 · 1 min · santu

RocketMQ如何保证消息不丢失?

典型回答 RocketMQ的消息想要确保不丢失,需要生产者、消费者以及Broker的共同努力,缺一不可。 首先在生产者端,消息的发送分为同步、异步两种和单向发送(单向发送不保证成功,不建议使用),在同步发送消息的情况下,消息的发送会同步阻塞等待Broker返回结果,在Broker确认收到消息之后,生产者才会拿到SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息。 1 2 3 4 5 6 7 8 9 10 try { SendResult sendResult = producer.send(msg); // 同步发送消息,只要不抛异常就是成功。 if (sendResult != null) { //重试逻辑 } } catch (Exception e) { //重试逻辑 } 异步发送的时候,会有成功和失败的回调,这还是需要在失败回调中处理重试确保成功。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 // 异步发送消息, 发送结果通过callback返回给客户端。 producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { // 消息发送成功。 System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { // 消息发送失败 //重试逻辑 } }); 但是Broker其实并不会立即把消息存储到磁盘上,而是先存储到内存中,内存存储成功之后,就返回给确认结果给生产者了。然后再通过异步刷盘的方式将内存中的数据存储到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。 ...

March 22, 2026 · 1 min · santu

RocketMQ如何实现延时消息?

典型回答 RocketMQ是支持延迟消息的,延迟消息写入到Broker后,不会立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息。 当消息发送到Broker后,Broker会将消息根据延迟级别进行存储。RocketMQ的延迟消息实现方式是:将消息先存储在内存中,然后使用Timer定时器进行消息的延迟,到达指定的时间后再存储到磁盘中,最后投递给消费者。 但是,RocketMQ的延迟消息并不是支持任意时长的延迟的,它只支持(5.0之前):1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h这几个时长。 另外,RocketMQ 5.0中新增了基于时间轮实现的定时消息。 前面提到的延迟消息,并使用Timer定时器来实现延迟投递。但是,由于Timer定时器有一定的缺陷,比如在定时器中有大量任务时,会导致定时器的性能下降,从而影响消息投递。 因此,在RocketMQ 5.0中,采用了一种新的实现方式:基于时间轮的定时消息。时间轮是一种高效的定时器算法,能够处理大量的定时任务,并且能够在O(1)时间内找到下一个即将要执行的任务,因此能够提高消息的投递性能。 并且,基于时间轮的定时消息能够支持更高的消息精度,可以实现秒级、毫秒级甚至更小时间粒度的定时消息。 具体实现方式如下: RocketMQ在Broker端使用一个时间轮来管理定时消息,将消息按照过期时间放置在不同的槽位中,这样可以大幅减少定时器任务的数量。 时间轮的每个槽位对应一个时间间隔,比如1秒、5秒、10秒等,每次时间轮的滴答,槽位向前移动一个时间间隔。 当Broker接收到定时消息时,根据消息的过期时间计算出需要投递的槽位,并将消息放置到对应的槽位中。 当时间轮的滴答到达消息的过期时间时,时间轮会将该槽位中的所有消息投递给消费者。 使用方式: 1 2 3 4 5 6 7 8 9 10 11 12 13 //创建一个消息生产者 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 设置消息的延迟级别为3,即延迟10s message.setDelayTimeLevel(3); // 消息发送 SendResult sendResult = producer.send(message); System.out.printf("%s%n", sendResult);

March 22, 2026 · 1 min · santu

RocketMQ有几种集群方式?

典型回答 3种,分别是单Master模式、多Master模式以及多Master多Slave模式。 单Master集群,这是一种最简单的集群方式,只包含一个Master节点和若干个Slave节点。所有的写入操作都由Master节点负责处理,Slave节点主要用于提供读取服务。当Master节点宕机时,集群将无法继续工作。 多Master集群:这种集群方式包含多个Master节点,不部署Slave节点。这种方式的优点是配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;缺点是单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。 多Master多Slave集群:这种集群方式包含多个Master节点和多个Slave节点。每个Master节点都可以处理写入操作,并且有自己的一组Slave节点。当其中一个Master节点宕机时,消费者仍然可以从Slave消费。优点是数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;缺点是性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

March 22, 2026 · 1 min · santu

RocketMQ消息堆积了怎么解决?

典型回答 RocketMQ的消息堆积(包括所有其他的MQ),一般都是因为客户端本地消费过程中,由于消费耗时过长或消费并发度较小等原因,导致客户端消费能力不足,出现消息堆积的问题。 首先,需要明确的是,MQ堆积是正常的,因为MQ有个重要的作用就是削峰填谷,既然他能起到削峰填谷的作用,那就意味着他需要帮你去接收更多的消息,然后放到自己的队列里面,下游再慢慢消费。所以,出现堆积的情况不要慌,也不一定要立刻就进去解决。 所以,MQ堆积的这个问题一旦发生了,比如线上有告警了,处理过程应该是: 1、先去定位具体什么场景,哪个topic的消息堆积了 2、看下当前的堆积情况是否严重,是否在减缓 3、查看上游流量情况,是否有营销活动,或者定时任务在运行 4、分析下堆积导致的延迟是否可以接受 5、考虑扩容增加消费者提升消费速度 6、优化代码,进一步解决堆积的问题 定位问题 MQ堆积了,我们是经常出现的,经常会出现几十万条的堆积告警,一般看到这个告警提示堆积的时候,会先看下是哪个topic堆积了,具体是什么业务在消费这个消息。 同时报警提示有10万条堆积,但是当前还有多少, 是比10万更多了,还是慢慢的在消费掉了,比如还剩3万了。这个需要看一下,有可能有突发流量过了。然后很快消费掉了是常见的事儿。 然后,你需要再去看下这个消息是谁发的,直接看下他们的流量或者直接招人问一下,是不是有什么营销活动导致流量激增,是不是在跑什么定时任务导致集中发消息了。 通过以上定位之后,基本上就可以判断这个“问题”到底需不需要解决了,很多时候可以不用解决,比如: 1、消息虽然堆积了一下,但是很快就都消费掉了。 2、这个场景消息堆积造成的影响不大,比如业务本身就能接受一定延迟,晚一点也无所谓,慢慢消费就行了。 3、上游在跑定时任务,一次性的,很快就跑完了,消息慢慢消费也可以。 关于延迟,其实用了MQ本身就是可以接受延迟的,只是这个延迟的大小的阈值不同而已,所以根据实际业务的情况判断下是否可以接受,很多MQ的消费,其实小时级的延迟都是问题不大的。当然也有一些要分钟级处理,这种就需要介入看了。 解决问题 如果定位到MQ堆积之后,经过分析发现这个问题还是需要解决的话,那么就有2种方案,一种是临时方案,一种是长期方案。 先说临时方案,快速解决堆积的问题,首先就是考虑扩容,增加消费者的数量,因为消息堆积了,消费不过来了,那就把消费者的数量增加一下,让更多人的实例来消费这些消息。(但是扩容的时候,需要考虑下游服务的承载能力,不要把下游给打挂了。) 长期方案那就是改代码了,但是面试的时候,也不能上来就八股吟唱,还是要先分析问题在哪。 消息堆积,有效方式就是提升消费速度,消费速度慢可能有很多原因,比如: 1、消息投递速度慢 2、出现了慢SQL 3、单线程存在瓶颈 4、下游服务RT变长 5、整个MQ消费的处理流程长 如果是消息的投递速度慢的话,可能是MQ用的不对,使用了单条拉消息的方案,其实RocketMQ是支持批量拉消息的,可以考虑换成批量拉消息的方案,通过调整ConsumeMessageBatchMaxSize的值来拉取批量消息,默认32条一次,可以调整到更高。还可以调整BatchConsumeMaxAwaitDurationInSeconds来设置批量消费的最大等待时长。 针对慢SQL的情况,有的时候线上跑的好好地,一直都没有慢SQL,但是突然有一天就有了慢SQL了,大部分原因是因为数据量积累变多了,导致表变大了,CRUD都变慢了。这时候就需要从数据库层面优化了,比如做数据归档、分库分表、增加索引等方式,提升SQL速度。 单线程存在瓶颈,这种也比较常见,那比较典型的方案就是引入线程池来进行并发消费,让多线程一起来干活。这种要配合批量消息来做,并且需要考虑如果有某个线程失败了,导致消息丢失的问题。(这个在我的项目课中有具体的落地代码和解决方案) 下游RT变长,这种一般是在MQ消费过程中需要调外部服务,而外部服务的RT比较长导致的, 这种一方面是让下游做优化,提升他的RT。要不然就可以和下一个问题解决方案一样。 最后一个就是整个MQ消费的处理流程长,可能是因为SQL慢但是又不好治理,或者前面说的下游RT长,或者就是要干的事情多,这种情况,可以采用一种方案 ,那就是先收单,然后再通过定时任务慢慢处理的方式。执行流程如下: 相当于让消息不要在MQ中堆积,而是自己收单存在来,放到自己的数据库中,靠定时任务消费处理,好处是可以避免消息投递多次不成功之后被删除。 除了以上方案,还有其他的方案,比如: 降低生产者的生产速度:如果生产者可控的话,可以让生产者生成消息的速度慢一点。 清理过期消息:有一些过期消息、或者一直无法成功的消息,在业务做评估之后,如果无影响或者影响不大,其实是可以清理的。 ✅RocketMQ消费堆积问题排查

March 22, 2026 · 1 min · santu

留言给博主