Elastic-Job的基础使用
发布日期:2021-07-27 04:56:33 浏览次数:5 分类:技术文章

本文共 9343 字,大约阅读时间需要 31 分钟。

Quartz官方分布式方案,这种方式比较重,需要根据官方文档新建数据表,并不推荐。

我们更常用的是使用【ElasticJob】实现分布式任务。

Elastic job是当当网基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,解决了Quartz不支持分布式的弊端。Elastic job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等。

一、使用场景分析

大多数情况下,定时任务我们一般使用quartz开源框架就能满足应用场景。

但如果考虑到健壮性等其它一些因素,就需要自己下点工夫,比如:要避免单点故障,至少得部署2个节点吧,但是部署多个节点,又有其它问题,有些数据在某一个时刻只能处理一次,比如 i = i+1 这些无法保证幂等的操作,run多次跟run一次,完全是不同的效果。

对于上面的问题,可以使用quartz+zk或redis分布式锁的解决方案

  1. 每类定时job,可以分配一个独立的标识(比如:xxx_job)
  2. 这类job的实例,部署在多个节点上时,每个节点启动前,向zk申请一个分布式锁(在xxx_job节点下)
  3. 拿到锁的实例,才允许启动定时任务(通过代码控制quartz的schedule),没拿到锁的,处于standby状态,一直监听锁的变化
  4. 如果某个节点挂了,分布式锁自动释放,其它节点这时会抢到锁,按上面的逻辑,就会从standby状态,转为激活状态,小三正式上位,继续执行定时job。

这个方案,基本上解决了HA(High Availability))和业务正确性的问题,但是美中不足的地方有2点

  1. 无法充分利用机器性能,处于standby的节点,实际上只是一个备胎,平时啥也不干
  2. 性能不方便扩展,比如:某个job一次要处理上千万的数据,仅1个激活节点,要处理很久

elastic-job相当于quartz+zk的加强版,它允许对定时任务分片,可以集群部署(每个job的"分片"会分散到各个节点上),如果某个节点挂了,该节点上的分片,会调度到其它节点上。

一般情况下,使用SimpleJob这种就可以了

二、概述

官网地址:

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

2.1 分片

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

2.2 分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

2.3 个性化参数的适用场景

个性化参数即 shardingItemParameter ,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。

合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

三、核心理念

3.1 分布式调度

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

3.2 作业高可用

Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1 主n从的方式执行。

一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

3.3 最大限度利用资源

Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

3.4 整体框架图
  • 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务
  • 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态
  • 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
  • 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
  • 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
  • 每次分片都会按服务器 IP 排序,保证分片结果不会产生较大波动
  • 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
框架图
在这里插入图片描述

四、使用实现springBoot+ElasticJob

模拟场景

假设有三个定单数据库(我们用三张表: pop_order_data1 , `pop_order_data2 , pop_order_data3 来模拟三个数据库),每天晚上2点会定时把已完成订单移到HBASE,然后删除数据库中的已完成订单,这里我们只模拟删除订单,而且改为每10秒执行一次进行模拟

使用方式

elastic-Job分为SimpleJobDataFlowJob两种:

  • SimpleJob意为简单实现,未经任何封装的类型。需实现SimpleJob接口,该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。
  • DataFlowJob用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。会先进行一轮数据获取,然后在处理,然后再次获取的循环过程。

两种都是多线程的处理机制。

4.1 导入依赖
com.dangdang
elastic-job-lite-core
2.1.5
com.dangdang
elastic-job-lite-spring
2.1.5
4.2 配置文件

application.properties

#端口号server.port=7777#配置数据源spring.datasource.url=jdbc:mysql://localhost:3306/mydb10?serverTimezone=UTC&useSSL=true&characterEncoding=utf8spring.datasource.username=rootspring.datasource.password=123456spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.datasource.type=com.alibaba.druid.pool.DruidDataSource#配置mybatis的xml文件mybatis.mapper-locations=classpath:mappers/*.xml

bean.xml 这是spring的xml文件,里面包含了分布式事务的处理,这里三个分片对应三个表操作

注册中心配置,用于注册和协调作业分布式行为的组件,目前仅支持Zookeeper

属性名 类型 构造器注入 缺省值 描述
serverLists String 连接Zookeeper服务器的列表 包括IP地址和端口号 多个地址用逗号分隔 如:host1:2181,host2:2181
namespace String Zookeeper的命名空间
baseSleepTimeMilliseconds int 1000 等待重试的间隔时间的初始值 单位:毫秒
maxSleepTimeMilliseconds String 3000 等待重试的间隔时间的最大值 单位:毫秒
maxRetries String 3 最大重试次数
sessionTimeoutMilliseconds boolean 60000 会话超时时间 单位:毫秒
connectionTimeoutMilliseconds boolean 15000 连接超时时间 单位:毫秒
digest String 连接Zookeeper的权限令牌 缺省为不需要权限验证
4.3 pojo层、dao层和service层
/*** pojo*/@Data@NoArgsConstructor@AllArgsConstructorpublic class Order implements Serializable {
private Integer id; private Integer order_id; private Integer vender_id; private Integer order_state; //5表示订单完成}/*** dao*/public interface OrderDao {
//添加数据 void add_data1(Order order); void add_data2(Order order); void add_data3(Order order); //删除已完成订单(状态5)数据 void delete_data1(Integer id); void delete_data2(Integer id); void delete_data3(Integer id); //每次获取10条订单状态为已完成的数据 List
findByTen_data1(); List
findByTen_data2(); List
findByTen_data3();}/*** service*/public interface OrderService {
//添加数据 void add_data1(Order order); void add_data2(Order order); void add_data3(Order order); //删除已完成订单(状态5)数据,这里根据job_id决定操作哪个表,id表示删除哪个数据 void delete_data(Integer job_id, Integer id); //每次获取10条订单状态为已完成的数据 List
findByTen(Integer job_id);}
4.4 配置类
/** * 这个类是为了让springBoot将bean.xml文件解析 */@Configuration@ImportResource(locations = {
"classpath:bean.xml"})public class BeanConfiguration {
}
4.5 MyJob类

这个类是Job具体要执行的操作,必须注入到前面bean.xml对应的任务中

/** * 使用SimpleJob必须遵从SimpleJob接口,并实现方法 */public class MyJob implements SimpleJob {
@Autowired private OrderService orderService; @Override public void execute(ShardingContext shardingContext) {
//格式打印 //getShardingItem()获取的是bean.xml文件中的分片代号 //getShardingParameter()是自定义的个性化参数名称 System.out.println(String.format("Item: %s |ShardingParameter: %s | Time: %s | Thread: %s | %s", shardingContext.getShardingItem(),shardingContext.getShardingParameter() , new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE")); //执行删除操作 //我们这里从数据库每次获取10条状态为已完成订单的数据,然后执行删除 List
orders = orderService.findByTen(shardingContext.getShardingItem()); for (Order order : orders) {
orderService.delete_data(shardingContext.getShardingItem(), order.getId()); } System.out.println("Thread:" + Thread.currentThread().getId() +" ,删除了表格"+ shardingContext.getShardingParameter() +"数据条数:" + orders.size()); }}

运行结果如下:

Item: 0 |ShardingParameter: pop_order_data1 | Time: 20:23:40 | Thread: 67 | SIMPLEItem: 1 |ShardingParameter: pop_order_data2 | Time: 20:23:40 | Thread: 68 | SIMPLEItem: 2 |ShardingParameter: pop_order_data3 | Time: 20:23:40 | Thread: 69 | SIMPLEThread:68 ,删除了表格pop_order_data2数据条数:10Thread:67 ,删除了表格pop_order_data1数据条数:10Thread:69 ,删除了表格pop_order_data3数据条数:10Item: 2 |ShardingParameter: pop_order_data3 | Time: 20:23:50 | Thread: 72 | SIMPLEItem: 0 |ShardingParameter: pop_order_data1 | Time: 20:23:50 | Thread: 70 | SIMPLEItem: 1 |ShardingParameter: pop_order_data2 | Time: 20:23:50 | Thread: 71 | SIMPLEThread:70 ,删除了表格pop_order_data1数据条数:10Thread:71 ,删除了表格pop_order_data2数据条数:10Thread:72 ,删除了表格pop_order_data3数据条数:10Item: 0 |ShardingParameter: pop_order_data1 | Time: 20:24:00 | Thread: 73 | SIMPLEItem: 1 |ShardingParameter: pop_order_data2 | Time: 20:24:00 | Thread: 74 | SIMPLEItem: 2 |ShardingParameter: pop_order_data3 | Time: 20:24:00 | Thread: 75 | SIMPLEThread:74 ,删除了表格pop_order_data2数据条数:10Thread:73 ,删除了表格pop_order_data1数据条数:10Thread:75 ,删除了表格pop_order_data3数据条数:10

可以看出,实现了分布式任务,同时操作三个表进行删除操作,且是多线程操作,速度很快。

五、单表多分片操作

除了以上的方式以外,我们还可以分片9个,每个表3个分片,这样进行操作,但是实际应用很少。

我们同样需要写dao和service,这里我们可以将表的名称以【${}】字段形式拼接进去。

insert into ${dataName}(id,order_id,vender_id,order_state) values (#{id},#{order_id},#{vender_id},#{order_state})
delete from ${dataName} where id=#{id}

bean.xml 配置仅仅修改数量和对应的名称即可

然后就是在Job任务配置,注意【取余】操作

@Overridepublic void execute(ShardingContext shardingContext) {
//执行删除操作 //我们这里从数据库每次获取10条状态为已完成订单的数据,然后执行删除 List
orders = orderService.findByTen_data(shardingContext.getShardingParameter()); for (Order order : orders) {
if (order.getId()%3==shardingContext.getShardingItem() || (order.getId()%3+3)==shardingContext.getShardingItem() || (order.getId()%3+6)==shardingContext.getShardingItem()) {
orderService.delete_data(shardingContext.getShardingParameter(), order.getId()); System.out.println("分片 " + shardingContext.getShardingParameter() + "删除的数据id是:" + order.getId()); } }}

执行结果如下:

分片 pop_order_data1删除的数据id是:237分片 pop_order_data2删除的数据id是:156分片 pop_order_data1删除的数据id是:238分片 pop_order_data3删除的数据id是:156分片 pop_order_data3删除的数据id是:157分片 pop_order_data2删除的数据id是:158分片 pop_order_data3删除的数据id是:155分片 pop_order_data1删除的数据id是:236分片 pop_order_data2删除的数据id是:157分片 pop_order_data3删除的数据id是:160分片 pop_order_data2删除的数据id是:159分片 pop_order_data3删除的数据id是:158分片 pop_order_data1删除的数据id是:240分片 pop_order_data1删除的数据id是:241分片 pop_order_data3删除的数据id是:159分片 pop_order_data2删除的数据id是:161分片 pop_order_data1删除的数据id是:239分片 pop_order_data2删除的数据id是:160

六、properties.yml配置方式以及DataFlow的job实现

这个部分实现请看【GitHub】上的代码,模拟了数据源。

转载地址:https://blog.csdn.net/qq_45337431/article/details/103403944 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:策略过滤器的灵活性分析
下一篇:架构的演进

发表评论

最新留言

不错!
[***.144.177.141]2024年09月12日 22时09分34秒