在 Spring AMQP(RabbitMQ)中,消息的 重试机制 主要有以下几种方式:

1. 使用 RetryTemplate 进行消息消费端重试

Spring AMQP 提供了 RetryTemplate,可以在 @RabbitListener 方法内部手动实现重试逻辑。

示例:手动重试

@Component
public class MyMessageListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final RetryTemplate retryTemplate = new RetryTemplate();

    @RabbitListener(queues = "myQueue")
    public void listen(String message) {
        retryTemplate.execute(context -> {
            // 业务处理
            processMessage(message);
            return null;
        }, context -> {
            // 失败后,进入降级处理,比如存入错误交换机
            rabbitTemplate.convertAndSend("erro.exchange", "erro.Key", message);
            return null;
        });
    }

    private void processMessage(String message) {
        if (Math.random() > 0.5) { // 模拟失败情况
            throw new RuntimeException("处理失败");
        }
        System.out.println("消息处理成功:" + message);
    }
}
  • retryTemplate.execute(context -> {} 进行消息消费。

  • 如果抛出异常,则进入第二个 context -> {} 逻辑进行降级处理(例如存入错误交换机)。

2. 配置 @RabbitListener 进行自动重试

Spring AMQP 5.x 之后,可以直接使用 @RabbitListener@Retryable 注解来开启消费端的重试机制。

示例:基于 @Retryable 进行自动重试

    @Component
public class MyMessageListener {

    @RabbitListener(queues = "myQueue")
    @Retryable(
        value = {RuntimeException.class},  // 仅对特定异常进行重试
        maxAttempts = 3, // 最大重试次数
        backoff = @Backoff(delay = 2000, multiplier = 2) // 初始延迟2秒,每次乘2倍
    )
    public void listen(String message) {
        System.out.println("收到消息:" + message);
        if (Math.random() > 0.5) { // 模拟异常
            throw new RuntimeException("消息处理失败");
        }
    }

    @Recover  // 处理最终失败的情况
    public void recover(RuntimeException e, String message) {
        System.out.println("最终失败,存入日志:" + message);
    }
}
  • @Retryable:最多重试3次,每次失败后等待 2s,下一次等待时间翻倍。

  • @Recover:如果 3 次都失败,执行降级方法 recover()

3. 使用 Dead Letter Exchange(DLX) 进行队列级别的消息重试

原理:如果队列中消息被拒绝(basic.nackbasic.reject),RabbitMQ 可以将消息发送到 死信交换机(DLX),然后可以在 DLX 队列中进行重试。

配置步骤

  1. 声明正常队列,并绑定死信队列

@Bean
public Queue normalQueue() {
    return QueueBuilder.durable("normal.queue")
            .deadLetterExchange("dlx.exchange") // 绑定死信交换机
            .deadLetterRoutingKey("dlx.key")    // 绑定死信路由键
            .ttl(5000) // 5 秒后消息进入死信队列
            .build();
}

@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("dlx.queue").build();
}

@Bean
public DirectExchange dlxExchange() {
    return new DirectExchange("dlx.exchange");
}

@Bean
public Binding dlxBinding(Queue deadLetterQueue, DirectExchange dlxExchange) {
    return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("dlx.key");
}

2消费者手动拒绝消息

@Component
public class MyMessageListener {

    @RabbitListener(queues = "normal.queue")
    public void listen(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            System.out.println("消费消息:" + message);
            if (Math.random() > 0.5) {
                throw new RuntimeException("处理失败");
            }
            channel.basicAck(tag, false); // 成功确认
        } catch (Exception e) {
            channel.basicNack(tag, false, false); // 进入死信队列
        }
    }

    @RabbitListener(queues = "dlx.queue")
    public void handleDeadLetterQueue(String message) {
        System.out.println("死信队列处理:" + message);
    }
}
  • 如果消息消费失败,将 basic.nack(tag, false, false),消息进入 死信队列

  • 之后可以在 死信队列 中进行人工重试或日志记录。

4. 使用 RepublishMessageRecoverer 进行错误消息转发

如果你希望在消费失败时,自动将失败的消息转发到 错误交换机,可以使用 RepublishMessageRecoverer

配置 MessageRecoverer

 @Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(rabbitTemplate, "erro.exchange", "erro.Key");
}
  • 失败的消息会被重新发布到 erro.exchange,并使用 erro.Key 作为路由键。

  • 这样可以在 erro.queue 中专门处理异常消息,例如记录日志、报警等。

  • 总结

    方案

    适用场景

    处理方式

    RetryTemplate

    适用于手动控制重试

    @RabbitListener 逻辑内部进行编程式重试

    @Retryable

    适用于简单的消费端重试

    maxAttempts 限制最大重试次数,@Recover 处理最终失败

    死信队列(DLX)

    适用于队列级别的重试

    失败后进入 死信队列,可以人工处理或自动重试

    RepublishMessageRecoverer

    适用于自动转发失败消息

    失败后,消息自动发送到 错误交换机

    如果你的业务场景允许 立即重试,可以使用 RetryTemplate@Retryable
    如果需要 延迟重试人工干预,建议使用 死信队列(DLX) 进行处理。

    你当前的 RepublishMessageRecoverer 配置,适用于将错误消息存入专门的错误交换机,但 不会自动重试,如果要增加重试机制,建议结合 DLXRetryTemplate

  • Spring AMQP(RabbitMQ) 中,消息重试机制的触发通常取决于 消费端处理失败,以下几种情况会导致消息重试:


    1. @Retryable 注解触发的消息重试

    如果消费者方法上使用了 @Retryable,那么当抛出指定异常时,Spring AMQP 会自动进行重试。

    触发条件

    • 消费者方法抛出指定的异常(如 RuntimeException

    • 配置了 @Retryable,且未超过 maxAttempts 指定的最大重试次数

    示例

       @RabbitListener(queues = "myQueue")
@Retryable(
    value = {RuntimeException.class}, // 只有 RuntimeException 才会触发重试
    maxAttempts = 3,  // 最大重试 3 次
    backoff = @Backoff(delay = 2000, multiplier = 2) // 初始延迟2秒,每次翻倍
)
public void listen(String message) {
    System.out.println("收到消息:" + message);
    throw new RuntimeException("消费失败"); // 抛出异常触发重试
}

不会触发重试的情况

  • 如果方法执行成功,不会重试

  • 如果抛出的异常 不在 @Retryable(value = {}) 配置范围内

  • 如果超过 maxAttempts,进入 @Recover 处理(如果有

2. RetryTemplate 手动触发消息重试

如果消费者使用 RetryTemplate,那么当业务代码抛出异常时,RetryTemplate 会控制重试逻辑。

触发条件

  • RetryTemplate.execute() 方法中,业务代码抛出异常

  • maxAttempts 配置未达到上限

示例

private final RetryTemplate retryTemplate = new RetryTemplate();

@RabbitListener(queues = "myQueue")
public void listen(String message) {
    retryTemplate.execute(context -> {
        processMessage(message);
        return null;
    }, context -> {
        System.out.println("消息处理失败,存入日志:" + message);
        return null;
    });
}

private void processMessage(String message) {
    throw new RuntimeException("模拟消费失败"); // 抛出异常触发重试
}

不会触发重试的情况

  • 业务代码执行成功

  • maxAttempts 配置达到上限


3. basic.nackbasic.reject 触发(死信队列/DLX 机制)

如果 RabbitMQ 手动确认模式manual ack)下,消费者调用 channel.basicNack()channel.basicReject(),可以将消息重新入队或发送到 死信队列(DLX)

触发条件

  • 消费者代码调用 channel.basicNack(tag, false, true)requeue = true 时消息会被重新投递)

  • 消费者代码调用 channel.basicReject(tag, true)(同样会重新入队)

示例

@RabbitListener(queues = "normal.queue")
public void listen(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    try {
        System.out.println("消费消息:" + message);
        throw new RuntimeException("模拟异常"); // 触发重试
    } catch (Exception e) {
        channel.basicNack(tag, false, true); // 重新入队
    }
}

4. RepublishMessageRecoverer 触发错误消息重发

RepublishMessageRecoverer 不是重试,而是当消费失败时,将错误消息发布到 错误交换机erro.exchange)。

触发条件

  • @RabbitListener 方法执行时抛出异常

  • 配置了 MessageRecoverer

  • 重试失败后,Spring AMQP 自动调用 RepublishMessageRecoverer

示例

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(rabbitTemplate, "erro.exchange", "erro.Key");
}

@RabbitListener(queues = "myQueue")
public void listen(String message) {
    System.out.println("收到消息:" + message);
    throw new RuntimeException("消费失败"); // 触发 RepublishMessageRecoverer
}

不会触发重试的情况

  • 如果 @RabbitListener 方法执行成功,则不会进入 RepublishMessageRecoverer

  • 这个方式不是重试,而是消息转移


5. RabbitMQ 自带的重试机制

如果 RabbitMQ 配置了 TTL(消息存活时间)+ 死信队列(DLX),则消息会在 TTL 过期 后被重新投递到死信队列,并可在 DLX 队列中进行重试。

触发条件

  • 队列消息 TTL 到期

  • 死信交换机(DLX) 存在

  • DLX 消费者重新处理消息

示例

@Bean
public Queue normalQueue() {
    return QueueBuilder.durable("normal.queue")
            .deadLetterExchange("dlx.exchange") // 绑定死信交换机
            .deadLetterRoutingKey("dlx.key")    // 绑定死信路由键
            .ttl(5000) // 5 秒后进入死信队列
            .build();
}
  • 如果消息 5 秒内未被消费,它会自动进入死信队列 dlx.queue,可以由 死信消费者 进行重试处理。

不会触发重试的情况

  • TTL 过期但没有 DLX 配置,消息会直接被丢弃

  • 死信队列消费失败但没有 requeue = true,不会再次投递

  • 总结

    触发方式

    触发条件

    适用场景

    是否自动重试

    @Retryable

    消费方法抛出异常

    代码级别自动重试

    ✅ 是

    RetryTemplate

    业务代码抛出异常

    需要手动控制重试

    ✅ 是

    basic.nack(requeue = true)

    消费失败,拒绝后重新入队

    业务失败但希望立即重试

    ✅ 是

    RepublishMessageRecoverer

    消费失败

    记录失败消息,但不会重试

    ❌ 否(只是转发)

    死信队列(DLX)

    TTL 过期 / basic.nack(requeue = false)

    延迟重试、死信处理

    ❌ 需要手动处理


    最佳实践

    • 如果希望立即重试,可以使用 @Retryablebasic.nack(requeue = true)

    • 如果希望延迟重试,可以使用 死信队列(DLX)+ TTL

    • 如果不想丢失失败消息,而是存入错误日志,可以使用 RepublishMessageRecoverer

    如果你的 RabbitMQ 消息消费失败后没有被重试,可以检查:

    1. 是否有 @RetryableRetryTemplate

    2. 是否有 basicNack(tag, false, true)(重新入队)

    3. 是否配置了 死信队列(DLX)

    4. 是否使用了 RepublishMessageRecoverer,它不会重试,而是转发错误消息