🚟消息队列
I. 如何解决消息丢失问题?
消息丢失可能发生在三个阶段,我们需要分段治理:
1. 生产者丢失消息(Producer ➡️ Broker)
原因:网络波动、Broker 宕机,导致消息根本没发到 Broker,但生产者以为发了。
解决方案:
开启发布确认机制 (Publisher Confirm):
生产者将 Channel 设置为 confirm 模式。
每发送一条消息,Broker 收到并写入磁盘后,会给生产者回传一个 ACK。
如果生产者没收到 ACK(或收到 NACK),就重试发送。
备选:事务机制 (Transaction):虽然也能保证安全,但性能极差,通常不推荐。
2. Broker 丢失消息(Broker 自身)
原因:Broker 接收到了消息,但还没来得及写入磁盘就宕机了(内存数据丢失)。
解决方案:持久化 (Persistence)。
需要同时做3️⃣件事,缺一不可:
Exchange 持久化:声明交换机时 durable=true。
Queue 持久化:声明队列时 durable=true。
Message 持久化:发送消息时设置投递模式 deliveryMode=2。
3. 消费者丢失消息(Broker ➡️ Consumer)
原因:消费者开启了“自动确认”(Auto Ack)。消息刚被消费者接收(还在内存中没处理),消费者进程挂了,Broker 以为已经消费成功,就把消息删了。
解决方案:
关闭自动确认,开启手动确认 (Manual Ack)。
消费者在业务逻辑完全处理成功后,才调用 channel.basicAck() 告诉 Broker 可以删除消息。
如果处理失败,可以调用 basicNack 或 basicReject 让消息重新入队。
II. 如何解决消息重复问题?
原因:
网络抖动:生产者发了消息,Broker 收到了,但回传 ACK 时网络断了。生产者以为失败了,就重发了一次。
消费者故障:消费者处理完了业务,还没来得及发 ACK 就挂了。Broker 以为没处理,就把消息发给另一个消费者。
解决方案:
消息重复在分布式系统中是无法完全避免的(为了保证不丢失,通常采用“至少一次投递”策略)。
所以,解决方案的核心不在于“不让它重复发”,而在于消费者端的幂等性 (Idempotency) 处理。
具体手段:
数据库唯一约束 (最强硬手段):
给每条消息生成一个全局唯一的 ID(MessageID 或业务 ID,如订单号)。
消费时,尝试向数据库(或去重表)插入这个 ID。
如果插入成功,则执行业务;如果报错(主键冲突),说明已经消费过,直接丢弃或 ACK。
Redis 原子操作 (性能较好):
利用 SETNX (set if not exists) 命令。
Key 为消息 ID,Value 为状态。
处理前先 SETNX,返回 true 则处理,返回 false 则说明正在处理或已处理。
乐观锁/版本号:
- 如果是更新操作,带上版本号条件:UPDATE table SET count = count + 1, version = version + 1 WHERE id = 1 AND version = 1。
III. 如何解决消息积压问题?
表现:
消费者挂了。
消费者处理速度太慢(代码逻辑耗时、数据库瓶颈)。
生产者发送流量突然暴涨(秒杀活动)。
根本原因:
业务设计与实际流量不匹配。
偶然的、未在设计预期内的短时高流量。
解决方案:
1. 紧急处理(线上已经积压了怎么办?)
如果积压了几百万条消息,直接加消费者可能来不及,或者数据库扛不住。
临时扩容方案:
修复消费者:先确保消费者逻辑没 Bug。
限流、降级:停止现有消费者,或者将它们降级。
部署大量消费者:临时部署 10-20 倍的消费者节点,专门消费那些临时队列。
恢复:积压消费完后,恢复原有架构。
2. 预防措施(长期治理)
重新评估架构和业务设计: 结合实际情况重构、优化代码,使得消费者的处理速度与业务需求相匹配
优化消费者代码:采用异步处理、批量处理、优化 SQL。
增加消费者数量:利用 RabbitMQ 的 Work Queues 模式,横向扩展消费者实例。
设置预取数量 (Prefetch Count):不要让消费者一次拉取太多消息导致内存溢出,设置合理的 basicQos,让消费者“按需干活”。
死信队列 (DLQ):给队列设置 TTL(过期时间),过期的消息进入死信队列,避免堵塞主队列,事后人工处理死信,也可以自动+人工处理相结合。
总结表格
| 问题 | 核心发生点 | 核心解决方案 | 关键词 |
|---|---|---|---|
| 消息丢失 | 发送端、Broker、消费端 | 确认机制 + 持久化 + 手动 ACK | Publisher Confirm, Durable, Manual Ack |
| 消息重复 | 网络波动、ACK 丢失 | 消费者端做幂等性处理 | 唯一 ID, Redis SETNX, 数据库唯一索引 |
| 消息积压 | 生产快、消费慢 | 临时扩容队列 + 分发逻辑 | 临时队列, 横向扩展, 异步优化 |