我开始使用 RabbitMQ 处理消息队列,我很喜欢使用 AMQP,下面是代码,所以我发现当使用 RabbitTemplate 和 amqp.connectionFactory 时,我们不能拥有来自同一个 org.springframework.amqp.rabbit 的 Channel Bean .connection.ConnectionFactory.
但我想使用 Channel 对象来使用 basiAck 设置手动 ACK,
@RabbitListener(queues = "email_queue", concurrency = "4", ackMode = "Manual")
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(1000))
public void sendEmail(String customerEmail, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
simpleMailMessage.setFrom("[email protected]");
simpleMailMessage.setTo(customerEmail);
simpleMailMessage.setText("Hi, your ticket has been booked Successfully.");
simpleMailMessage.setSubject("Movie Tickets Booked");
javaMailSender.send(simpleMailMessage);
channel.basicAck(deliveryTag, false);
}catch (Exception e){
log.error("Email Consumer Service::" + e.getMessage());
throw e;
}
}
问题是,我没有在rabbit-config中定义Channel对象/Bean,我将如何从rabbitTemplate中获取它,所以我可以将它作为上面方法中的参数传递。下面是我的 RabbitConfig。
@Configuration
public class RabbitConfig {
@Value("${rabbitmq.email.queue}")
private String emailQueueName;
@Value("${rabbitmq.email.exchange.name}")
private String emailExchangeName;
@Value("${rabbitmq.email.binding.key}")
private String emailBindingKey;
@Bean
public Queue emailQueue() {
return new Queue(emailQueueName);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(emailExchangeName);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(emailQueue()).to(topicExchange()).with(emailBindingKey);
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
我应该使用rabbitmq.client库来定义connectionFactory - >新连接 - >通道 - >构建队列...... 然后使用 Channel 作为 @Autowired 属性? 我需要从 amqp 库完全切换到rabbitMq 库吗?
非常感谢任何完整的代码/github 示例,谢谢。
不,你不需要这样做。
@RabbitListener
不是一个bean,而是一个处理程序方法,当消息到达侦听器容器时由框架调用。
方法签名中这些参数的参数由
MessagingMessageListenerAdapter
: 设置
return this.handlerAdapter.invoke(message, amqpMessage, channel, amqpMessage.getMessageProperties());
如果您像在您的案例中那样使用
@Header
,则该适配器会执行其他智能工作,尝试解决其他一些参数。
在文档中查看更多信息:https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven.html