场景

延时任务的需求是非常常见的,在我们的开发生涯中,我相信大部分研发人员是会有接触到这一块的。其中,我们在现实生活中比较常接触到的场景有

  • 下单后未付款通知
  • 订单延迟生成短信通知
  • 红包未领取退款通知
  • 用户行程将近通知
  • 微信公众号的文章延时发布推送等

常规模块

  • 存储
    主要负责存储任务的相关信息,比如任务执行所需要的相关信息,执行时间,重试次数等

  • 调度
    负责时间的调度,判断任务是否到达可执行

  • 执行
    负责任务的实际执行

在非分布式的情况下,通常这三者是出现在同一个应用内部的;但如果是在分布式系统架构中,那么存储、执行和调度通常是分开的,并且一个任务还可能分割成多个 子任务分别在不同的服务上运行。分布式情况下,每一个模块可选的方案都不少,多样且复杂;但是最终都需要单体内实现的支持,我们这里就只讨论单体模式下的延 时任务方案。

常规解决方案

  • 进程内的

    • ScheduledThreadPoolExecutor
    • 时间轮(HashedWheelTimer)
  • 进程外的

    • 通过定时任务框架(Quartz/Elastic-Job/Elastic-Job/QSchedule 等)
    • 通过延时消息(Kafka/RocketMQ/QMQ 等)
    • 依托于外部存储的自定义实现(Redis/MySQL 等)

下面主要讲讲进程内的两种方案,以及基于 QMQ 延时消息实现方案中 QMQ 延时消息的实现。

ScheduledThreadPoolExecutor vs. HashedWheelTimer

添加任务 删除任务 获取 精度 线程模型
ScheduledThreadPoolExecutor O(logN) O(logN) O(1) per tick 多线程
HashedWheelTimer O(1) O(1) O(1) 取决于每个 tick 的 duration 单线程

ScheduledThreadPoolExecutor

  • 核心功能

    • schedule(Runnable command, long delay, TimeUnit unit)
    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
  • 核心实现点

    • 继承 ThreadPoolExecutor
    • 自定义的 DelayedWorkQueue, 堆结构的无界阻塞队列
    • Leader-follower 线程模型

Leader-follower 线程模型

Leader/Follower设计避免了动态线程创建和销毁的额外开销。将线程放在一个自组织的池中,而且无需交换数据,这种方式将上下文切换、同步、数据移动和 动态内存管理的开销都降到了最低

上图就是L/F多线程模型的状态变迁图,共6个关键点

  • 线程有3种状态:领导 leader,处理 processor,追随 follower
  • 假设共N个线程,其中只有1个 leader 线程(等待任务),x 个 processor 线程(处理),余下有 N-1-x 个 follower 线程(空闲)
  • 有一把锁,谁抢到就是 leader
  • 事件/任务来到时,leader 线程会对其进行处理,从而转化为 processor 状态,处理完成之后,又转变为 follower
  • 丢失 leader 后,follower 会尝试抢锁,抢到则变为 leader,否则保持 follower
  • follower 不干事,就是抢锁,力图成为 leader

通俗解释: Explain “Leader/Follower” Pattern

添加任务

其和 ThreadPoolExecutor 的不同主要在它会按照过期时间的长短构造一个类小顶堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DelayedWorkQueue
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 扩容
grow();
size = i + 1;
// 第一个的话直接就设置成头结点
if (i == 0) {
queue[0] = e;
// set index into delay queue, to support faster cancellation
setIndex(e, 0);
} else {
// 构建堆
siftUp(i, e);
}
// 堆顶有变动则触发重新选 leader
if (queue[0] == e) {
leader = null;
// 发信号可争 leader
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

取出任务

已有 leader,则无限等待;否则做 leader 等待 delay 时间后执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
// 没有任务则无限等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
// 延时任务已到时则取出并重新维护堆
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
// 不是 leader 也无限等待
available.await();
else {
// 做 leader,等待对应的 delay 时间
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

问题:ScheduledThreadPoolExecutor 的延时任务是基于堆结构的,其添加和删除任务所需时间复杂度都为 O(logN), 对于超大量延时任务的情况来说,会 不会有更好的解决方案呢? Netty 中的 delay 重试机制?

时间轮

大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话,浪费 cpu 的资源并且很低效。时间轮是一种高效来利用线程资源来进行批量化调度的一种 调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理、触发、以及运行。能够高效的管理各种延时任务,周期 任务,通知任务等等。但是,时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合。因为时间轮算法的精度取决于,时间段“指针”单元 的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。而且时间轮算法没有做宕机备份,因此无法再宕机之后恢复 任务重新调度。

简易时间轮

概念:

  • tickDuration:时间轮由多个时间格组成,每个时间格就是 tickDuration,它代表当前时间轮的基本时间跨度。
  • wheelSize:代表每一层时间轮的格数
  • interval:当前时间轮的总体时间跨度,interval=tickMs × wheelSize

总的来说就是由数组 + 链表的形式来存储延时任务,但是需要将数组想象成是一个首尾相连的环形结构,有点类似一致性 hash 中结构的意思。那么添加任务的时候 就根据任务的延迟时间对数组取模运算这样来确定放在哪一个槽位,槽位中存储的是任务链表。那么不管是添加任务,还是移除任务,其时间复杂度都是 O(1),相对于 ScheduledThreadPoolExecutor 要更加效率。

问题:如果需要的 interval 很大,那么槽位不断增加,会急剧增加内存的消耗 (eg. tickDuration=1ms, interval=1d -> wheelSize = 1000 * 60 * 60 * 24 = 86400000),如何解决?

含轮次的时间轮

和简易时间轮相比,其引入了“轮次”的概念,那么在指针指向到对应的槽位的时候,我们就需要判断槽位中的延时任务列表,哪些任务的的轮次是属于当前轮次的,只有 当前轮次的任务才是真的到达了延时时间需要执行的任务。那么在 take 任务的时候,其复杂度就变成了 O(n) 了。或者说,每个槽位的存储结构由链表调整成堆, 这样的话,take 任务就是 O(1) 的复杂度,但是相应的插入复杂度就变成了 O(logN)。

Netty 的 HashedWheelTimer

问题:添加 or 获取任务的时间增加了,有没有更好的解决方案?

层级时间轮

该设计其实类似于手表,我们将使用多个轮盘,每个轮盘的 tickDuration 为上一个轮盘的 interval。就好比手表上的秒针,分针,时针。秒针转一圈,分针才 移动一个槽位。就是这么一种进阶关系。那么对应上面那个 tickDuration=1ms, interval=1d 的问题,新的层级时间轮的设计方式总共就只需要 1000 + 60 + 60 + 24 = 1144 个槽位即可。

Kafka 的 TimingWheel

如果需要一个星期、一个月,甚至一年的更长跨度的延时任务,如何解决?

携程 QMQ 的延时实现

了解 QMQ 的延时消息实现之前,需要先大致了解下 QMQ 的基本构成组件。如图

  • Meta Server:提供集群管理和集群发现的作用
  • Server:提供实时消息服务
  • Delay Server:提供延时/定时消息服务,延时消息先在delay server排队,时间到之后再发送给server
  • Producer:消息生产者
  • Consumer:消息消费者

我们这次主要关注 Delay Server 的实现。 如果想更多的了解 QMQ,可以自行去看其 Github 上的文档

相比于 RocketMQ 的只支持固定 level 的延时消息来说,QMQ 更加灵活,支持的延时范围更大,默认能支持两年的时间跨度,精确度为秒级。因为其内部是基于时间轮 来实现的,而其时间轮的 tickDuration=500ms,如图

  • message log:和实时消息里的 message log 类似,收到消息后 append 到该 log 就返回给 producer,相当于 WAL。
  • schedule log:按照投递时间组织,每个小时一个。该 log 是回放 message log 后根据延时时间放置对应的log上,这是上面描述的两层 hash wheel 的 第一层,位于磁盘上。schedule log 里是包含完整的消息内容的,因为消息内容从 message log 同步到了 schedule log,所以历史 message log 都 可以删除(所以 message log 只需要占用极小的存储空间,所以我们可以使用低容量高性能的 ssd 来获取极高的吞吐量)。另外,schedule log 是按照延时 时间组织的,所以延时时间已过的 schedule log 文件也可以删除。
  • dispatch log:延时/定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log 里写入的是消息的 offset,不包含消息 内容。当延时 server 中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。

QMQ HashedWheelTimer

大体流程如上图,大体说明如下,主要主要了解下列几个任务即可。

  1. Delay Server 中包含的几个周期定时任务
  • messageLogFlushService:负责 delay server 接受消息后,将 messagelog 刷盘
  • dispatchLogFlushService:delay message 到期发送后,写 offset 到 dispatchlog,其主要负责将 dispatchlog 刷盘
  • iterateOffsetFlushService:主要负责回放 messagelog,并管理回放进度,进度保存在 message_log_iterate_checkpoint.json
  1. WheelTickManager 主要工作
  • start timer: 初始化并启动时间轮
  • recover:根据 dispatchlog 和 回放进度恢复时间轮数据
  • load schedulelog:周期加载 schedulelog 数据来填充时间轮数据
  • 监听 messagelog 的回放事件,回放添加 schedulelog 的时候判断(改延时消息是否属于当前延迟刻度,eg. 1h 内)是否需要将其添加到时间轮中

note: QMQ 中的时间轮和 Netty 大体是一致的,都是先将任务添加到 Queue timeouts 中,然后周期从这个列表中获取 100000 个来添加到 HashedWheel 中对应的 HashedWheelBucket 中。