作者:京东科技 文涛

背景

金融社区优惠文章是基于京东商城优惠商品批量化自动生成的,每日通过不同的渠道获取到待生成的SKU列表,并根据条件生成优惠文章。

但是,生成优惠文章之后续衍生问题:

该商品无优惠了,对应文章需要做取消推荐或下架处理,怎样能更快的知道该商品无优惠了呢?

方案介绍

方案对比

方案1

承接该商品所有变更信息的消息,发生变更后二编文章。

优点:

实时,一旦变更立刻知道并更新文章。

缺点:

1 开销大,是要承接的消息多,可能100台机器也不一定能承接(亿级变更)。

2 耦合高,需要对接的业务方多,全部对接需要很长的周期及人力,同时对方发生业务变更需要通过人员同步更新逻辑。

方案2

通过任务轮训文章,调外部接口判断该商品是否有优惠,之后做相应的处理。

优点:

1 业务模型较简单,只需要判断是否有优惠或优惠变更即可。

2 优惠侧投入较小,只需要投入调度任务的机器即可。

缺点:

不实时,数据量大了,对任务的实时性是个挑战。

方案3

针对方式2的缺点,我们推出了【可伸缩自动任务】 + 【首次曝光监测】的组合模式。

即自己实现分布式调度增强,提高数据处理能力,提高调度鲁棒性、自动化等能力,同时采用首次曝光监测的方式,利用用户访问文章时判断是否有优惠,并做相应取消推荐或文章下线处理

优点:

1 较实时,第一批被推荐推到C端用户的文章有可能会看到无优惠兜底方案,其它人便不再被推送。

2 方式2的优点

缺点:

需要实现可伸缩自动任务组件

至此,如何保证千万量级的优惠文章监测优惠变更不至于周期太长成了难点。

接下来介绍可伸缩任务组件,是如何解决上述问题的:

可伸缩任务组件

关键能力

我们希望组件拥有的能力

•任务自动化,结束自动重新执行

•任务鲁棒性强,意外中断可从断点处重新唤起

•任务可分治,可利用线程池及分布式集群将整体任务拆分成多个子任务执行

•任务可扩展,具备新任务探测能力

•任务可熔断,可以监测连续异常并终止执行

实现

名词解释

任务指令:触发某个任务的一条指令信息

任务开关:控制整体任务执行情况,如:停止执行,分时段执行等

redo指令:当任务执行完成后,发出的重做指令

任务监测:负责监测任务执行情况,根据任务状态处理任务

实现思路

能否复用现有中间件?如:分布式任务,消息队列等

答案是可以,并且个人觉得最好是优先利用中间件能力,并将中间件的能力定义成组件的可扩展能力,方便中间件替换,提高组件的通用性

如果使用现有中间件实现该如何实现?

传统思路:

如何实现千万级优惠文章的优惠信息同步

分布式任务负责查询全量文章,将查询结果发送MQ,消费者消费单条消息,并进行业务处理

那么问题来了,

1 查询一轮任务需要多长时间呢?随着文章量的增加,调度周期设置多少合适呢?

2 MQ的消息将海量

显然这种方式不太适合数据量大的情况

那么我们的思路是:

1 将分布式调度抽象成一个心跳监测模块,用于监测任务状态,以及探测新任务,这样任务执行周期固定10min即可,任务执行时间也不会太长(实际执行时间200ms左右)

2 将MQ抽象成任务指令的载体,用于发送指令,接收指令,利用分布式的能力处理任务

3 将千万级的一次查询,拆分成多个查询,缩小单次指令执行的周期,将千万级文章信息同步至ES,使用ES的滚动查询能力,在执行单次任务时,可滚动查询10-20万的文章

4 将分布式共识组件用作开关能力,用于控制组件执行,在大促或下游压力过高时动态控制任务执行

5 将Redis用于任务信息存储和分布式指令防重

至此,我们使用到了分布式调度、消息队列、Redis、分布式共识、ES等中间件能力。

实现方案

1 指令的定义:

属性 说明
breakPoint 断点标识
rangeBegin 边界起
rangeEnd 边界终
startTime 开始执行时间
endTime 结束时间
lastExeTime 最后一次执行时间
exceptionTimes 发生错误次数
threadKey 分布式任务线程标识

2 工作流程图:

如何实现千万级优惠文章的优惠信息同步

工作流程说明:

1 任务监测模块负责周期性的监测现有任务执行情况及是否有新任务加入到任务列表中。

首先当拿到某个任务时,检验该任务最近活跃时间是否超出10分钟(可配置),如果超出则认为当前任务因某种原因已经终止执行了,此时发送唤起任务执行的指令。

接着执行新任务监测,如果有新任务加入,则将该任务加入到任务列表中。此时不需要发出任务唤起指令,下次任务监测则会根据上述逻辑发出唤起指令

2 任务执行模块收到指令后首先会校验当前任务的合法性,然后再执行任务

合法性校验点包括:

1)任务控制能力监测即相关开关监测

  1. 任务熔断能力监测,异常信息是否超出阈值,如果超出终止执行

  2. 任务防重监测,任务当前指令是否有其它线程在同时执行,如果有终止执行

执行的过程为:

1)任务采用异步线程池模式,收到执行指令(MQ)后立即开启异步线程执行,防止单条指令执行时间太长

2)执行接口调用方法,分批滚动查询待执行列表

3)循环待执行列表,执行相应业务逻辑

4)列表中每执行完一条数据,就会记录一下任务执行情况,用于作于异常中断后(机器重启),从断点处继续执行

5)任务发生异常记录异常信息

6)监测到任务真正执行完成,后发起redo指令,用于唤起下周期任务执行

目前效果

机器使用情况:微服务2台

任务拆分情况:目前任务被拆分成了30个子任务,平均每个扫描30万文章

实时性:1千万文章发生一次监测耗时4小时,下游接口TPS700左右

安全性:大促或下游压力大,可随时停止或分时段执行

鲁棒性:在微服务上线时,或接口调用异常时,任务产生中断,但过10分钟后,又会被从断点处重新唤起,不需要人工干预

中间件压力:复用调度和MQ等中间件但不拖累中间件,每天产生300条左右MQ消息,每条消息消费耗时10ms以内,每次心跳监测模块(分布式任务执行)耗时200ms左右

扩展

该组件,可根据业务逻辑做任何相关业务处理,如监测到已下架或取消推荐的文章,判断优惠存在时,依然可以做重新上架处理,不过此能力依赖业务系统配合

该组件目前缺少两个能力

1 任务出错后,可将错误信息发送告警,可通过接入监控系统实现,提高组件的告警能力

2 如何动态控制任务拆分逻辑,比如觉得4小时监测不够实时或太频繁时,想动态调整任务分治的粒度目前未实现