对于分布式的延迟队列,我们可以使用RabbitMQ、Redis等实现;对于进程内的延迟队列,Java本身就提供了比较方便使用的DelayQueue
。DelayQueue
是java.util.concurrent
包下面提供的一个类,它是一个阻塞队列(BlockingQueue
),内部其实是对优先级队列(PriorityQueue
)的封装实现;可以根据消息的TTL时间的大小来进行优先排序,DelayQueue
能保证TTL时间越小的消息就会越优先被消费。可以说,DelayQueue
是一个基于优先队列(PriorityQueue
)实现的阻塞队列(BlockingQueue
),队列中的消息的优先级是根据消息的TTL来决定的。
DelayQueue
已经为我们解决了并发的线程安全问题,所以我们可以直接在多线程环境并发操作DelayQueue
。
本文使用的代码已经上传至GitHub:delay-queue-example
DelayQueue的使用
先说两个接口:java.util.concurrent.Delayed
接口和java.lang.Comparable
接口:
Delayed
接口中定义了getDelay
方法,用于计算消息延迟的剩余时间;
Delayed
接口同时还继承了Comparable
接口,所以Delayed
接口的实现类可以使用Comparable
接口中的compareTo
方法来实现队列中的消息优先级排序。
放进DelayQueue
中的消息实体类必须要实现Delayed
接口,覆盖getDelay
和compareTo
方法,并分别提供计算剩余时间和比较优先级排序的实现。
1 延迟消息实体类
1 | package io.biteenew.java.delay.queue; |
2 延迟队列的消费者代码
1 | package io.biteenew.java.delay.queue; |
3 测试代码 & 测试结果
1 | package io.biteenew.java.delay.queue; |
可以看出,DelayQueue
可以严格按照消息的TTL时间来进行优先级排序,TTL小的时间会优先出列。由于DelayQueue
是进程内的延迟队列实现,所以实际应用中应该考虑程序挂掉的时候,消息丢失对实际生产的影响。
——————–【参考文章】——————–