今天介绍一个好用的分布式作业调度框架elastic-job,在工作中有时候可能会遇到需要写定时任务的场景,就拿网上常说的一个例子那就是余额宝的每日计息,这种需要定时自动执行,不需要人为触发的工作就需要用到定时任务来解决,目前java的定时任务框架有几种,如quartz、spring task、elastic job等,前两种笔者没用过,今天主要介绍的也就是笔者用过的第三种,elastic job是可以使用zookeeper做注册中心的一个分布式作业调度框架,能够实现定时任务压力的水平切分,而前两种笔者百度时发现它们在分布式场景中貌似不能很好的应用

一、简介

当当网的elastic job目前主要有1.x和2.x两种版本,两版本的核心类名做了修改,实现原理一样,其中2.x又分为lite版和cloud版,elastic job的定时任务有3种:简单任务、数据流任务、脚本任务,本文只讲解常用的前两种,另外需要自己先准备好zookeeper服务,笔者这里在192.168.1.120已经准备好了zookeeper注册中心


二、准备jar文件

基于maven构建的项目,首先引入需要的jar包

<!-- elastic-job-lite-core -->
<dependency>
  <groupId>com.dangdang</groupId>
  <artifactId>elastic-job-lite-core</artifactId>
  <version>2.1.5</version>
</dependency>
<!-- 使用spring自定义命名空间时引入 -->
<dependency>
  <groupId>com.dangdang</groupId>
  <artifactId>elastic-job-lite-spring</artifactId>
  <version>2.1.5</version>
</dependency>
<!-- spring -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>4.3.18.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>4.3.18.RELEASE</version>
</dependency>

三、写一个简单作业(网上找的例子)

/**
 * 简单作业实现SimpleJob接口,未经任何封装,与quartz原生接口类似
 * 任务总分片在spring里面配置为2,即若集群部署项目有2台机器运行时,定时任务可水平切分到2台机器
 */
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("Thread ID:"+Thread.currentThread().getId()+
                ",任务总片数:"+shardingContext.getShardingTotalCount()+
                ",当前分片项:"+shardingContext.getShardingItem());
    }
}

四、配置spring配置文件(先配置一个简单作业)

这里先简单说明一些配置的含义,cron是配置作业的执行周期,可自行百度cron表达式根据自己的业务进行调整,sharding-total-count是作业分片数,即需要将任务水平切分多少分并行执行,举个栗子:在一次定时任务中执行的任务很耗服务器的性能,如果只有一台机器执行的话,那估计很快就down掉,这时如果能把任务进行分片,例如分成2片(2个线程并行跑),服务器1使用线程1跑分片项1、服务器2使用线程2并行的跑分片项2,这样就很好的实现了压力的水平切分。值得一提的是我们需要根据自己的实际业务进行自己的任务分片,否则有可能出现重复执行的状况

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" 
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.dangdang.com/schema/ddframe/reg
        http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
        http://www.dangdang.com/schema/ddframe/job 
        http://www.dangdang.com/schema/ddframe/job/job.xsd">

    <!--配置作业注册中心-->
    <reg:zookeeper id="regCenter" server-lists="192.168.1.120:2181" namespace="hzk-job" 
    base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3"/>

    <!--配置作业,elastic-job 提供三种类型作业:Simple类型作业、Dataflow类型作业、Script类型作业-->
    <job:simple id="mySimpleJob" class="com.hzk.job.MySimpleJob" registry-center-ref="regCenter" 
    cron="0/5 * * * * ?" sharding-total-count="2"/>

</beans>

五、运行简单任务样例

首先,需要在web.xml配置上spring配置文件的路径,使得web容器启动时加载spring配置

<!--加载spring配置文件-->
<context-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:spring/applicationContext.xml</param-value>
</context-param>
<listener>
  <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

成功启动后即可看到效果,笔者这里就不演示了~

六、数据流任务

/**
 * 数据流作业,需要实现DataflowJob接口,该接口提供2个方法
 * fetchData:抓取数据  | processData:处理数据
 * 可通过streaming-process配置是否流式处理,默认为false。
 * 流式处理;每一次作业开始执行:抓取数据-处理数据-抓取数据-处理数据......这样的循环流程,
 *          直到抓取到的数据为空或集合长度=0,本次作业才完成(记为一次作业)
 * 非流式处理:每一次作业执行:抓取数据-处理数据-本次作业完成的流程(记为一次作业)
 */
public class MyDataflowJob implements DataflowJob<User> {

    @Override
    public List<User> fetchData(ShardingContext shardingContext) {
        System.out.println("=====开始抓取数据======");
        //例如去数据库查询数据
        List<User> users = sqlSession.selectList("***");
        return users;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<User> list) {
        System.out.println("抓取到所需数据");
        //处理数据
        System.out.println("本次数据处理完成");
    }
}

同样,将接好的定时任务配置到spring配置文件中(添加如下配置)即可生效

<job:dataflow id="myDataflowJob" class="com.hzk.job.MyDataflowJob" registry-center-ref="regCenter"
              cron="0/5 * * * * ?" sharding-total-count="2" streaming-process="true"/>

七、总结

在实际工作中,如果需要在分布式环境使用定时任务,推荐使用当当网的作业调度框架elastic job,另外关于上面数据流任务的编写,笔者修改了自己的源码仅以注释说明场景,需要看到实际效果的小伙伴可以自行改编模拟实际场景~~

发表评论

您的电子邮箱地址不会被公开。