Skip to content

🚟消息队列

I. 如何解决消息丢失问题?

消息丢失可能发生在三个阶段,我们需要分段治理:

1. 生产者丢失消息(Producer ➡️ Broker)

原因:网络波动、Broker 宕机,导致消息根本没发到 Broker,但生产者以为发了。
解决方案

  • 开启发布确认机制 (Publisher Confirm)

    • 生产者将 Channel 设置为 confirm 模式。

    • 每发送一条消息,Broker 收到并写入磁盘后,会给生产者回传一个 ACK。

    • 如果生产者没收到 ACK(或收到 NACK),就重试发送。

  • 备选:事务机制 (Transaction):虽然也能保证安全,但性能极差,通常不推荐。

2. Broker 丢失消息(Broker 自身)

原因:Broker 接收到了消息,但还没来得及写入磁盘就宕机了(内存数据丢失)。
解决方案持久化 (Persistence)
需要同时做3️⃣件事,缺一不可:

  1. Exchange 持久化:声明交换机时 durable=true。

  2. Queue 持久化:声明队列时 durable=true。

  3. Message 持久化:发送消息时设置投递模式 deliveryMode=2。

3. 消费者丢失消息(Broker ➡️ Consumer)

原因:消费者开启了“自动确认”(Auto Ack)。消息刚被消费者接收(还在内存中没处理),消费者进程挂了,Broker 以为已经消费成功,就把消息删了。
解决方案

  • 关闭自动确认,开启手动确认 (Manual Ack)

  • 消费者在业务逻辑完全处理成功后,才调用 channel.basicAck() 告诉 Broker 可以删除消息。

  • 如果处理失败,可以调用 basicNack 或 basicReject 让消息重新入队。


II. 如何解决消息重复问题?

原因

  • 网络抖动:生产者发了消息,Broker 收到了,但回传 ACK 时网络断了。生产者以为失败了,就重发了一次。

  • 消费者故障:消费者处理完了业务,还没来得及发 ACK 就挂了。Broker 以为没处理,就把消息发给另一个消费者。

解决方案
消息重复在分布式系统中是无法完全避免的(为了保证不丢失,通常采用“至少一次投递”策略)。
所以,解决方案的核心不在于“不让它重复发”,而在于消费者端的幂等性 (Idempotency) 处理

具体手段

  1. 数据库唯一约束 (最强硬手段)

    • 给每条消息生成一个全局唯一的 ID(MessageID 或业务 ID,如订单号)。

    • 消费时,尝试向数据库(或去重表)插入这个 ID。

    • 如果插入成功,则执行业务;如果报错(主键冲突),说明已经消费过,直接丢弃或 ACK。

  2. Redis 原子操作 (性能较好)

    • 利用 SETNX (set if not exists) 命令。

    • Key 为消息 ID,Value 为状态。

    • 处理前先 SETNX,返回 true 则处理,返回 false 则说明正在处理或已处理。

  3. 乐观锁/版本号

    • 如果是更新操作,带上版本号条件:UPDATE table SET count = count + 1, version = version + 1 WHERE id = 1 AND version = 1。

III. 如何解决消息积压问题?

表现

  • 消费者挂了。

  • 消费者处理速度太慢(代码逻辑耗时、数据库瓶颈)。

  • 生产者发送流量突然暴涨(秒杀活动)。

根本原因:

  • 业务设计与实际流量不匹配。

  • 偶然的、未在设计预期内的短时高流量。

解决方案

1. 紧急处理(线上已经积压了怎么办?)

如果积压了几百万条消息,直接加消费者可能来不及,或者数据库扛不住。
临时扩容方案

  1. 修复消费者:先确保消费者逻辑没 Bug。

  2. 限流、降级:停止现有消费者,或者将它们降级。

  3. 部署大量消费者:临时部署 10-20 倍的消费者节点,专门消费那些临时队列。

  4. 恢复:积压消费完后,恢复原有架构。

2. 预防措施(长期治理)

  • 重新评估架构和业务设计: 结合实际情况重构、优化代码,使得消费者的处理速度与业务需求相匹配

  • 优化消费者代码:采用异步处理、批量处理、优化 SQL。

  • 增加消费者数量:利用 RabbitMQ 的 Work Queues 模式,横向扩展消费者实例。

  • 设置预取数量 (Prefetch Count):不要让消费者一次拉取太多消息导致内存溢出,设置合理的 basicQos,让消费者“按需干活”。

  • 死信队列 (DLQ):给队列设置 TTL(过期时间),过期的消息进入死信队列,避免堵塞主队列,事后人工处理死信,也可以自动+人工处理相结合。


总结表格

问题核心发生点核心解决方案关键词
消息丢失发送端、Broker、消费端确认机制 + 持久化 + 手动 ACKPublisher Confirm, Durable, Manual Ack
消息重复网络波动、ACK 丢失消费者端做幂等性处理唯一 ID, Redis SETNX, 数据库唯一索引
消息积压生产快、消费慢临时扩容队列 + 分发逻辑临时队列, 横向扩展, 异步优化