RabbitMQ尽最大可能保证消息顺序投递、顺序消费,如果要确保消息完全按照投递的顺序消费,首先要保证消息投递到交换机是有序的。
以使用Spring-AMQP框架为例,一般会配置连接工厂类型为ThreadChannelConnectionFactory
和CachingConnectionFactory
。
ThreadChannelConnectionFactory
确保同一线程上的所有操作都使用同一个通道(始终保持打开状态),在同一个通道上,一个线程内部执行的消息投递顺序是有序的。在使用上,为了避免内存泄漏,还必须调用closeThreadChannel
来释放通道资源。ThreadChannelConnectionFactory
支持消息发布确认,但不支持消息退回,多数情况下考虑使用CachingConnectionFactory
。
CachingConnectionFactory
默认使用通道缓存。使用模板发送消息时,会从缓存中检出一个通道用于操作,并且还会返回到缓存中以供重用。在有多个线程的情况下,无法保证同一个线程的多次操作都使用相同的通道。
从Spring-AMQP 2.0开始提供了模板的invoke
方法,包含一个名为OperationsCallback
的接口。在回调范围内和提供的RabbitOperations
参数上执行的任何操作都使用相同的专用通道。必须注意作用域操作绑定单个线程,只保证在同一线程上消息顺序投递。
以下为部分示例代码:
MessageService.java
public void sendOrderedMessage(List<String> contentList) {
List<String> messageBodys = new ArrayList<>();
contentList.forEach(content -> {
MessageEntity messageEntity = MessageEntity.builder()
.createTime(LocalDateTime.now())
.userName("instaer")
.content(content)
.build();
String messageBody;
try {
messageBody = objectMapper.writer().writeValueAsString(messageEntity);
messageBodys.add(messageBody);
} catch (JsonProcessingException e) {
throw new RuntimeException("对象(" + messageEntity.toString() + ")转换为消息异常", e);
}
});
rabbitTemplate.invoke((RabbitOperations.OperationsCallback<Object>) operations -> {
messageBodys.forEach(messageBody -> {
String correlationId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(correlationId);
String messageType = "ORDERED_MESSAGE";
operations.convertAndSend(
RabbitMQConstants.Exchange.EXCHANGE_MESSAGE,
RabbitMQConstants.RoutingKey.ROUTING_KEY_SINGLE,
messageBody,
message -> {
message.getMessageProperties()
.setHeader(RabbitMQConstants.Header.MESSAGE_TYPE, messageType);
message.getMessageProperties().setCorrelationId(correlationId);
return message;
},
correlationData
);
//添加发布确认回调
correlationData.getFuture()
.addCallback(confirmFutureCallbackBuilder.build(correlationId, messageType, messageBody));
});
return true;
});
}
在多线程消息投递的场景下,比如主线程Thread-1上投递了一条消息message-1,然后再调用另外一个线程Thread-2投递一条消息message-2。
按照业务逻辑,message-1必须先于message-2投递,但由于RabbitMQ的异步特性和使用缓存通道,无法确定是否使用相同的通道,因此也无法保证message-1和message-2的投递顺序。可以为发布者连接工厂使用缓存大小为1的有界通道缓存(setChannelCacheSize(1)
),同时设置通道检出的超时时间,这样可以保证多个线程在同一通道上投递消息。
在使用同一通道的情况下,仍然无法保证message-1一定先于message-2投递,出于线程调度的原因,有可能出现Thread-2先于Thread-1投递消息。因此需要配置一个串行化线程池,例如Executors.newFixedThreadPool(1)
,可以保证所有线程在同一通道上消息投递操作都是有序的,从而保证多线程下消息顺序投递。
以下为部分示例代码:
RabbitMQConfig.java
private static CachingConnectionFactory buildCachingConnectionFactory(RabbitProperties prop) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(prop.getHost());
connectionFactory.setPort(prop.getPort());
connectionFactory.setUsername(prop.getUsername());
connectionFactory.setPassword(prop.getPassword());
connectionFactory.setVirtualHost(prop.getVirtualHost());
connectionFactory.setRequestedHeartBeat((int) prop.getRequestedHeartbeat().getSeconds());
return connectionFactory;
}
/**
* 默认连接工厂
*
* @param prop
* @return
*/
@Bean
@Primary
public ConnectionFactory connectionFactory(RabbitProperties prop) {
return buildCachingConnectionFactory(prop);
}
/**
* 默认操作模板
*
* @param connectionFactory
* @param returnsCallbackBuilder
* @return
*/
@Bean
@Primary
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ReturnsCallbackBuilder returnsCallbackBuilder) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallbackBuilder.build());
// 为生产者使用单独的连接,避免当生产者因任何原因被阻塞时消费者被阻塞
rabbitTemplate.setUsePublisherConnection(true);
return rabbitTemplate;
}
/**
* 为顺序发布消息的生产者提供的连接工厂
*
* @param prop
* @return
*/
@Bean
public ConnectionFactory orderedPublisherConnectionFactory(RabbitProperties prop) {
CachingConnectionFactory orderedPublisherConnectionFactory = buildCachingConnectionFactory(prop);
// 限制channel缓存为1个,确保多线程下消息始终发布在同一通道上,并保证顺序
orderedPublisherConnectionFactory.setChannelCacheSize(1);
orderedPublisherConnectionFactory.setChannelCheckoutTimeout(5000L);
return orderedPublisherConnectionFactory;
}
/**
* 为设置了使用独立连接工厂的有序发布生产者的消费者提供的连接工厂
*
* @param orderedPublisherConnectionFactory
* @param prop
* @return
*/
@Bean
public ConnectionFactory orderedConsumerConnectionFactory(@Qualifier("orderedPublisherConnectionFactory") ConnectionFactory orderedPublisherConnectionFactory, RabbitProperties prop) {
CachingConnectionFactory consumerConnectionFactory = buildCachingConnectionFactory(prop);
consumerConnectionFactory.setPublisherConnectionFactory((AbstractConnectionFactory) orderedPublisherConnectionFactory);
return consumerConnectionFactory;
}
/**
* 为有序发布消息的生产者提供的操作模板
*
* @param orderedConsumerConnectionFactory
* @param returnsCallbackBuilder
* @return
*/
@Bean
public RabbitTemplate orderedRabbitTemplate(@Qualifier("orderedConsumerConnectionFactory") ConnectionFactory orderedConsumerConnectionFactory, ReturnsCallbackBuilder returnsCallbackBuilder) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(orderedConsumerConnectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallbackBuilder.build());
// 为生产者使用单独的连接,避免当生产者因任何原因被阻塞时消费者被阻塞
rabbitTemplate.setUsePublisherConnection(true);
return rabbitTemplate;
}
MessageService.java
// 单个线程池保证消息顺序提交(在设置单个缓存channel时保证消息按顺序投递)
private final static ExecutorService singleExecutor = Executors.newFixedThreadPool(1);
public void sendMultiOrderedMessage(List<String> contentList) {
List<String> messageBodys = new ArrayList<>();
contentList.forEach(content -> {
MessageEntity messageEntity = MessageEntity.builder()
.createTime(LocalDateTime.now())
.userName("instaer")
.content(content)
.build();
String messageBody;
try {
messageBody = objectMapper.writer().writeValueAsString(messageEntity);
messageBodys.add(messageBody);
} catch (JsonProcessingException e) {
throw new RuntimeException("对象(" + messageEntity.toString() + ")转换为消息异常", e);
}
});
// 模拟多个线程中消息顺序提交的场景
messageBodys.forEach(messageBody -> singleExecutor.submit(() -> {
String correlationId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(correlationId);
String messageType = "MULTI_ORDERED_MESSAGE";
orderedRabbitTemplate.convertAndSend(
RabbitMQConstants.Exchange.EXCHANGE_MESSAGE,
RabbitMQConstants.RoutingKey.ROUTING_KEY_SINGLE,
messageBody,
message -> {
message.getMessageProperties()
.setHeader(RabbitMQConstants.Header.MESSAGE_TYPE, messageType);
message.getMessageProperties().setCorrelationId(correlationId);
return message;
},
correlationData
);
//添加发布确认回调
correlationData.getFuture()
.addCallback(confirmFutureCallbackBuilder.build(correlationId, messageType, messageBody));
}));
}
以上提供了消息顺序顺序投递的解决方案,接下来需要保证消息在消费端按投递顺序消费。
显然消息消费者只有一个时才能执行串行化顺序操作(注意在默认情况下RabbitMQ对所有消费者循环派发消息)。RabbitMQ提供了单个活动消费者配置,单个活动消费者允许一次只有一个消费者从队列中消费,并在当前活动消费者被取消或死亡的情况下自动故障转移到另一个注册消费者。
在创建单个消费者队列时指定singleActiveConsumer
属性为true即可。第一个注册的消费者成为此队列上唯一的活跃消费者(处于活跃状态),所有发送到该队列的消息都会发送给它,而其它消费者(处于等待状态)则被忽略。
和独占消费者(exclusive consumer)相比,单一活跃消费者对应用端维持消费连续性的压力更小。消费者只需要注册并自动处理故障转移,无需检测活动消费者故障并注册新消费者。
以下为部分示例代码:
RabbitMQConfig.java
/**
* 仅允许单个活动消费者的队列
*
* @return
*/
@Bean
public Queue singleQueue() {
return QueueBuilder.durable(RabbitMQConstants.Queue.QUEUE_SINGLE)
// 指定死信交换机
.deadLetterExchange(deadExchange().getName())
// 指定最大长度 超出长度则成为死信消息
.maxLength(200)
.singleActiveConsumer()
.build();
}
MessageListener.java
/**
* 仅允许单个活动消费者接收消息
*
* @param message
* @param channel
*/
@RabbitHandler
@RabbitListener(queues = RabbitMQConstants.Queue.QUEUE_SINGLE)
public void receiveSingleMessage(Message message, Channel channel) {
String correlationId = message.getMessageProperties().getCorrelationId();
String messageType = message.getMessageProperties().getHeader(RabbitMQConstants.Header.MESSAGE_TYPE);
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
MessageEntity messageEntity;
try {
messageEntity = objectMapper.readValue(messageBody, MessageEntity.class);
} catch (IOException e) {
throw new RuntimeException("消息[correlationId:" + correlationId + ", 类型:" + messageType + ", 内容:" + messageBody + "]转换为对象异常", e);
}
try {
log.info("收到消息[correlationId:{}, 类型:{}, 内容:{}], ", correlationId, messageType, messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
设置单个消费者会造成消息处理的并行度下降,可以在应用端进行分片处理,不同的队列处理对应分类下的有序消息,提高CPU利用率。