Kafka如何实现批量消费?

典型回答 批量消费指的是一次性拉过来一批消息,然后进行批量处理。 Kafka想要实现批量消费有很多种方案。其中比较简单的就是基于@KafkaListener 实现,这也是比较推荐的方案。(还有些其他方案,比如用原生kafka肯定也能,包括Spring Cloud Stream也支持kafka的批量消费,但是用的都不多。) 首先需要依赖spring-kafka这个包: 1 2 3 4 5 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.4.RELEASE</version> </dependency> 接着需要配置一个消费者工厂: 1 2 3 4 5 6 7 8 9 10 @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //开启批量消费 factory.setBatchListener(true); //设置手动提交 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode return factory; } 这里通过factory.setBatchListener(true); 的方式设置采用批量消费,但是需要注意的是,ConcurrentKafkaListenerContainerFactory的默认的提交方式是自动提交,如果在自动提交模式下,批量消费是有可能会丢消息的,所以,需要设置为手动提交。 ...

March 22, 2026 · 1 min · santu

Kafka的批量消费如何确保消息不丢?

典型回答 ✅Kafka如何实现批量消费? 在Kafka的批量消费中,经常会出现丢消息的情况,稍有不慎就会丢,甚至有时候你还不知道会丢,因为很多人没这个意识,不知道有这种可能。 丢消息的情况 首先第一种情况,就是当使用自动提交的时候,可能会丢消息。加入你的kafka中有以下配置: 1 2 enable.auto.commit=true auto.commit.interval.ms=5000 这样配置表示每隔 5 秒自动提交当前 poll 到的最大 offset。 那么就会出现这样的情况: 消费者从 Kafka 拉取了一批消息。 Kafka 客户端自动在 5 秒后提交 offset。 但是应用代码还没处理完这批消息,有可能执行过程中出错或者失败了。 但是 Kafka 因为接收到了offset,那么他就会认为这批消息已经处理完,不再重新发送了。 那么,还有第二种情况, 如果用了手动提交,就没问题了吗?看以下代码: 1 2 3 4 5 6 7 8 9 @KafkaListener(topics = "my-topic", containerFactory = "kafkaListenerContainerFactory") public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) { try{ // 批量处理逻辑 }finally{ ack.acknowledge(); //手动提交偏移量 } } 在finally中调用偏移量提交,这时候会把最大的偏移量+1提交掉,也就意味着,不管你的try执行成功还是失败,都会提交,那么就会出现上面一样的情况,消息执行失败,但是偏移量被提交了,导致丢消息。 ...

March 22, 2026 · 1 min · santu

MQ的重平衡会带来哪些问题?

典型回答 ✅什么是Kafka的重平衡机制? ✅RocketMQ和Kafka一样有重平衡的问题吗? 重平衡的概念不重复介绍了,上面提到过了,。那么重平衡会带啦哪些问题呢? STW 大家最容易理解的就是STW的问题。 因为在重平衡过程中,消费者可能会短暂停止消费,等待新的分区/队列分配,导致吞吐下降。 当然,也可以使用Kafka中的渐进式重平衡或 下一代重平衡协议(Kafka 4.0)减少影响。 或者RocketMQ本身的重平衡机制带来的STW问题也比较小。 重复消费 除了STW的问题还有一个就是可能导致消费重复的问题,当消费者重新分配队列/分区时,可能会重新拉取道未提交的消息,导致消息被多次消费。 比如说开始分给了消费者A,A还没消费完还没提交偏移量,发生了重平衡,这个消息就会再分给消费者B,这时候就出现了重复消费了。 消息堆积 Kafka 旧版本(4.0之前)重平衡时,所有消费者都会暂停,导致短时间内消息积压。 RocketMQ 采用 定时重平衡(默认 20s),如果消费者宕机,消息可能会积压在该消费者上,直到下一次重平衡。

March 22, 2026 · 1 min · santu

Kafka如果丢消息了,可能的原因是什么?

典型回答 ✅Kafka如何保证消息不丢失? 先看上面的文章,然后再看本文!!! 看完Kafka如何保证不丢消息之后,那么反过来想,就知道如果消息丢了可能是什么原因导致的了。同样,要从生产者,消费者和broker三端分别来说。 生产者端 对于生产者来说,如果他发送过程中失败了,那么消息肯定会丢。生产者失败的情况并不多,一般就是以下2种情况了: 未开启消息确认(常见) acks=0:消息发送后不等待Kafka响应,网络失败或Kafka挂掉就会丢消息。 acks=1:只等 leader broker 确认写入成功,follower 尚未同步时 leader 挂了也可能丢失。 acks是啥,该如何配置,看上面的链接就够了,这里不重复说了 未处理发送失败的情况(常见) 我们通常使用的producer.send(msg)其实是一种异步发送,发送消息的时候,方法会立即返回,但是并不代表消息一定能发送成功。这时候如果失败了,是没办法感知的。 所以,如果你没用用callback的机制来处理失败的回调,也可能会导致消息丢失。 发送过程中,Producer挂了 这种情况也有可能出现,就是消息正准备发呢,但是没等发出去,应用就挂了。那么消息也可能会丢了 Broker端 下面这篇也重点介绍了Broker导致消息丢的情况,这里简单总结下。 ✅为什么Kafka没办法100%保证消息不丢失? Broker挂了(不常见) 最常见的就是Broker挂了的情况,还没来及做消息的持久化以及同步,他挂了,那么消息就可能会丢了。 那有人说,Broker不是个集群么?集群的话挂了不是有选举机制么。确实,那么如果是下面的情况一样会丢消息。 leader 还没同步给 follower,就挂了(不常见) 如果在 acks=1这种情况下,但 leader broker 在follower还没同步完成之前就挂了,那么数据就丢了。 未开启主从同步或者没有集群部署,leader挂了(不常见) 这种也是有可能的,就只有一个leader在抗,挂了就完蛋了。 消费一直不成功 Kafka的消息默认保存72小时,在消息过期前,消费者一直没有消费成功,到期后消息会被删除。这种情况也可能会导致消息丢失。 消费者端 到了消费者这,消息在丢失的情况就比较少了,但是也不是没有,比如。。。 没处理成功就自动提交了offset(常见) 不要以为没有这种情况,你比如说批量消费的情况,你设置了自动提交,那么就有可能把未成功消费的消息给丢掉。 ✅Kafka的批量消费如何确保消息不丢? 扩展知识 防止消息丢失最佳实践 生产者: 1 2 3 acks=-1 // 表示 Leader 和 Follower 都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。 retries=3 // 生产端的重试次数 retry.backoff.ms = 300 //消息发送超时或失败后,间隔的重试时间 使用**producer.send(msg, callback)**方法处理异常。 Broker : 1 2 3 min.insync.replicas=5 //表示 ISR 最少的副本数量,通常设置 min.insync.replicas >1,这样才有可用的follower副本执行替换,保证消息不丢失 unclean.leader.election.enable=false //是否可以把非 ISR 集合中的副本选举为 leader 副本。 replication.factor=5 //表示分区副本的个数,replication.factor >1 当leader 副本挂了,follower副本会被选举为leader继续提供服务。 消费者: 1 enable.auto.commit=false 手动提交 offset,确保处理完成后再提交。 ...

March 22, 2026 · 1 min · santu

留言给博主