RabbitMQ本身并不提供延迟队列的功能,但是我们仍然可以使用RabbitMQ的TTL(Time-To-Live)
和DLX(Dead Letter Exchanges)
这两个扩展特性来实现延迟队列,实现消息的延迟消费和延迟重试的功能。
本文使用的软件环境: 操作系统-Ubuntu 16.04;RabbitMQ服务器-v3.7.6;RabbitMQ Java Client-v5.3.0
测试的完整代码在GitHub上:rabbitmq-delay-queue
1 延迟队列的应用场景
延迟消费:
用户在线购买商品并提交订单后,服务器后台生成一条与订单相关的消息并丢到30分钟的延迟队列中;30分钟后服务器会收到延迟队列推送的此订单的消息,然后检查订单状态是否为已付款,未付款则取消订单。
延迟重试:
用户注册成功后,服务器后台向用户邮箱发送欢迎页面,如果发送失败则将消息丢到1分钟的延迟队列中;1分钟后收到此消息继续重试发送邮件。
2 Time-To-Live Extensions & Dead Letter Exchanges
Time-To-Live Extensions(TTL):
RabbitMQ允许我们为消息或者队列设置TTL过期时间。TTL表示一条消息可在队列中存活的最大时间(单位毫秒),即当某条消息发布的时候设置了TTL或者当某条消息发布到了设置TTL的队列时,这条消息在经过TTL时间后变成“死信(Dead Letter)”。
Dead Letter Exchanges(DLX):
我们可以通过为队列设置死信交换机(Dead Letter Exchange),当消息变成“死信(Dead Letter)”后,该队列会将该消息通过死信交换机重新路由出去。在RabbitMQ中,队列中的消息出现以下三种情况之一,就会变成“死信”:
1. 消息被拒绝:消息的消费者调用了basicReject()
或basicNack()
方法,并且设置requeue
参数为false
;
2. 消息因为设置了TTL而过期的时候;
3. 消息被发布到了一个长度达到上限的队列的时候
3 设计思路
图3.1 延迟消费模式和延迟重试模式设计图
延迟消费模式:
生产者发布消息到缓冲队列buffer-queue
中,消息在buffer-queue
中经过TTL时间成为死信后,会通过buffer-queue
设置的死信交换机dlx.exchange
重新路由出去。worker-queue
为实际的工作队列,通过路由键dlx.routing.key
与死信交换机dlx.exchange
绑定,最终死信消息会重新路由到实际的工作队列worker-queue
中,被消费处理。
延迟重试模式:
生产者发布消息到实际的工作队列buffer-queue
中,被消费者消费处理但是处理失败。消费者将处理失败的消息丢到缓冲队列buffer-queue
,失败的消息在buffer-queue
中经过TTL时间成为死信后,会通过buffer-queue
设置的死信交换机dlx.exchange
重新路由出去。worker-queue
为实际的工作队列,通过路由键dlx.routing.key
与死信交换机dlx.exchange
绑定,处理失败的消息在buffer-queue
中经过一段时间的延迟后,最终又路由回到worker-queue
中,被再次消费处理,这就是延迟重试。
4 代码实现
RabbitMQ有两种方式可以配置消息的TTL:
1. 为消息配置TTL:生产者发布消息的时候,为每条消息指定TTL(每条消息的TTL可以不同)。
2. 为队列配置TTL:通过队列的属性配置TTL,所有进入该队列的消息都拥有同样的TTL时间。
下面分别对这两种方式进行代码演示:由于篇幅所限,此处仅演示延迟消费模式的代码实现;完整的代码在GitHub上:rabbitmq-delay-queue
4.1 延迟消费模式:方式一:为发布的每条消息设置TTL
生产者部分代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机"out.exchange":生产者将消息通过"out.exchange"发送到"buffer-queue"。这里设置交换机类型为"direct",当然也可以使用其他类型
channel.exchangeDeclare("out.exchange", "direct", true);
// 创建死信交换机"dlx.exchange":"buffer-queue"中产生死信后,会通过此交换机发送出去。这里设置交换机类型为"direct",当然也可以使用其他类型
channel.exchangeDeclare("dlx.exchange", "direct", true);
// 创建缓冲队列"buffer-queue",并为"buffer-queue"设置死信交换机参数:生产者发布的消息会先到达此"buffer-queue",消息在"buffer-queue"中变成死信后,会通过死信交换机和死信路由键发送出去
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换机参数(x-dead-letter-exchange)
arguments.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键参数(x-dead-letter-routing-key)
channel.queueDeclare("buffer-queue", true, false, false, arguments);
// 将"buffer-queue"绑定到交换机"out.exchange",路由键设置为"out.routing.key"。
channel.queueBind("buffer-queue", "out.exchange", "out.routing.key");
这里将生产者发布消息方法做一个简单的封装:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* 发布消息
* @param bytes 消息实体
* @param ttl 消息的TTL,单位毫秒
*/
public void publish(byte[] bytes, long ttl) {
try {
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.expiration(String.valueOf(ttl))
.build();
channel.basicPublish("out.exchange", "out.routing.key", properties, bytes);
} catch (IOException e) {
LOGGER.error("Producer publish message with error: {}", e.getLocalizedMessage());
}
}
消费者部分代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机"dlx.exchange":跟生产者一样
channel.exchangeDeclare("dlx.exchange", "direct", true);
// 创建工作队列"worker-queue"
channel.queueDeclare("worker-queue", true, false, false, null);
// 将"worker-queue"绑定到交换机"dlx.exchange",并指定路由键为"dlx.routing.key"
channel.queueBind("worker-queue", "dlx.exchange", "dlx.routing.key");
// 消费者监听队列"worker-queue"的消息并打印出接收到的消息
channel.basicConsume("worker-queue", true, new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
LOGGER.info("Consumer received message: {}", new String(body));
}
});
延迟消费测试用例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// 测试1:生产5条消息,分别设置消息的TTL为:1秒,2秒,3秒,4秒,5秒
for (int i = 1; i <= 5; i++) {
String message = "" + i;
LOGGER.info("Producer publish message: {}", message);
publish(message.getBytes(), i*1000L);
}
// 测试2:生产5条消息,分别设置消息的TTL为:5秒,4秒,3秒,2秒,1秒
for (int i = 5; i > 0; i--) {
String message = "" + i;
LOGGER.info("Producer publish message: {}", message);
publish(message.getBytes(), i*1000L);
}
// 测试1结果:每条消息都是延时1秒后,被消费者收到。
2018-07-03 15:41:35.594 Producer publish message: 1
2018-07-03 15:41:35.595 Producer publish message: 2
2018-07-03 15:41:35.596 Producer publish message: 3
2018-07-03 15:41:35.596 Producer publish message: 4
2018-07-03 15:41:35.596 Producer publish message: 5
2018-07-03 15:41:36.599 Consumer received message: 1
2018-07-03 15:41:37.598 Consumer received message: 2
2018-07-03 15:41:38.598 Consumer received message: 3
2018-07-03 15:41:39.600 Consumer received message: 4
2018-07-03 15:41:40.598 Consumer received message: 5
// 测试2结果:所有消息都是延时5秒后才被消费者收到
2018-07-03 16:00:09.529 Producer publish message: 5
2018-07-03 16:00:09.530 Producer publish message: 4
2018-07-03 16:00:09.530 Producer publish message: 3
2018-07-03 16:00:09.531 Producer publish message: 2
2018-07-03 16:00:09.531 Producer publish message: 1
2018-07-03 16:00:14.535 Consumer received message: 5
2018-07-03 16:00:14.535 Consumer received message: 4
2018-07-03 16:00:14.536 Consumer received message: 3
2018-07-03 16:00:14.536 Consumer received message: 2
2018-07-03 16:00:14.536 Consumer received message: 1
方式一测试总结:
通过对比上面两个测试用例的结果,我们知道RabbitMQ服务器是根据先进先出(FIFO)的规则来向消费者推送消息的;RabbitMQ是通过扫描队列头部(消息从尾部进入队列,从头部出列)的一条消息来判定消息是否过期,就算该条头部消息后面还有TTL比它小的消息,也不会跳过它让TTL小的消息先出列,而是必须要等到该条头部消息超时后,其他后面的消息才有机会得到TTL超时判断的机会。
所以这就会出现测试2中出现的结果:所有消息都是5秒后才被消费到。出列的消息并没有按照我们实际设置的TTL时间进行过期,这点在实际应用中需要注意。
4.2 延迟消费模式:方式二:为队列设置TTL
方式二与方式一的配置代码大同小异,方式二需要在创建缓冲队列buffer-queue
的时候指定x-message-ttl
属性即可;同时方式二在发布消息的时候不需要指定ttl
参数。1
2
3
4
5
6// 创建缓冲队列"buffer-queue",并为"buffer-queue"设置死信交换机参数:生产者发布的消息会先到达此"buffer-queue",消息在"buffer-queue"中变成死信后,会通过死信交换机和死信路由键发送出去
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 5000); // 指定队列中消息最大存活时间("x-message-ttl)
arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换机参数(x-dead-letter-exchange)
arguments.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键参数(x-dead-letter-routing-key)
channel.queueDeclare("buffer-queue", true, false, false, arguments);
延迟消费测试用例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// 测试:生产5条消息
for (int i = 1; i <= 5; i++) {
String message = "" + i;
LOGGER.info("Producer publish message: {}", message);
publish(message.getBytes());
}
// 测试结果:每条消息都是按照队列限制的消息TTL来被消费者收到。
2018-07-03 17:00:15.704 Producer publish message: 1
2018-07-03 17:00:15.706 Producer publish message: 2
2018-07-03 17:00:15.706 Producer publish message: 3
2018-07-03 17:00:15.706 Producer publish message: 4
2018-07-03 17:00:15.706 Producer publish message: 5
2018-07-03 17:00:20.710 Consumer received message: 1
2018-07-03 17:00:20.710 Consumer received message: 2
2018-07-03 17:00:20.710 Consumer received message: 3
2018-07-03 17:00:20.710 Consumer received message: 4
2018-07-03 17:00:20.710 Consumer received message: 5
方式二测试总结:
在队列上设置消息的TTL不会出现消息与TTL设置不对应的情况,因为每条消息的在队列中的TTL时间都是一样的,RabbitMQ只需要扫描队列头部的消息并检测是否过期即可;消息依旧按照先进先出(FIFO)的规则进行处理。
4.3 延迟重试模式测试结果:采用方式二:为队列设置TTL
延迟重试:重试的缓冲队列采用方式二,即为队列设置TTL为10秒。这里仅贴出延迟重试的测试结果,可以直观地感受一下:1
2
3
4
5
6
7
8
9
10
11
12
132018-07-03 18:19:10.694 Producer publish message: 1
2018-07-03 18:19:10.696 Producer publish message: 2
2018-07-03 18:19:10.696 Producer publish message: 3
2018-07-03 18:19:10.696 Producer publish message: 4
2018-07-03 18:19:10.696 Producer publish message: 5
2018-07-03 18:19:10.698 Consumer received message: 1 -- handle ok.
2018-07-03 18:19:10.698 Consumer received message: 2 -- handle failed, will retry in 10 seconds.
2018-07-03 18:19:10.698 Consumer received message: 3 -- handle ok.
2018-07-03 18:19:10.699 Consumer received message: 4 -- handle ok.
2018-07-03 18:19:10.699 Consumer received message: 5 -- handle failed, will retry in 10 seconds.
2018-07-03 18:19:20.701 Consumer received message: 2 -- handle failed, will retry in 10 seconds.
2018-07-03 18:19:20.702 Consumer received message: 5 -- handle ok.
2018-07-03 18:19:30.704 Consumer received message: 2 -- handle ok.
以上,就是RabbitMQ实现延迟队列的设计。
——————–【参考文章】——————–