问题优化
如何防止消息丢失
生产者:把ACK设置为1(等待leader的ACK)或者ALL(leader及其follower同步之后返回ACK)
消费者:把自动提交设置为手动提交offset
如何防止消息重复消费
在防止消息丢失的⽅案中,如果生产者发送完消息后,因为网络抖动,没有收到ACK,但实际上broker已经收到了。此时生产者会进行重试,于是broker就会收到多条相同的消息,从而造成消费者的重复消费。
或者消费者已经消费了数据,但是offset没来得及提交,会导致重复消费。
解决方案,消费者保证幂等性:
- 在数据库中使用业务id作为唯一约束
顺序消费
- 生产者:在发送时将ACK不能设置0,关闭重试,使用同步发送,等到发送成功再发送下⼀条。确保消息是顺序发送的。
- 消费者:消息是发送到⼀个分区中,只能有⼀个消费组的消费者来接收消息。
在Kafka中若要实现顺序消费,会牺牲性能,但是Rocket MQ有专门的功能实现顺序消费。
解决消息积压问题
- 在⼀个消费者中启动多个线程,让多个线程同时消费。
- 可以创建多个消费组,创建多个消费者,部署在不同的服务器上。
- 创建⼀个消费者,该消费者另建一个主题,分配多个分区和多个消费者。消费者收到消息后,立即发往这个新的topic,此时,新的主题的多个分区的多个消费者就开始⼀起消费了。
延迟队列
订单创建后,超过30分钟没有⽀付,则需要取消订单,这种场景可以通过延时队列来实现。
具体方案
- Kafka中创建多个主题,每个topic表示延时的间隔,如
topic_30m
表示延时30分钟的延时队列 - 消费者消费该主题的消息
- 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没支付):
- 若超过了30分钟,去数据库中修改订单状态为已取消
- 若没有超过30分钟,记录当前消息的offset,并不再继续消费之后的消息。等待X分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复。