一文带你搞懂xxl-job(分布式任务调度平台)
本篇文章主要记录项目中遇到的 xxl-job 的实战,希望能通过这篇文章告诉读者们什么是 xxl-job 以及怎么使用 xxl-job 并分享一个实战案例。
xxl-job 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、VR免费地方门户源码易扩展。设计思想是将调度行为抽象形成 调度中心 平台,平台本身不承担业务逻辑,而是负责发起 调度请求 后,由 执行器 接收调度请求并执行 任务,这里的 任务 抽象为 分散的 JobHandler。通过这种方式即可实现 调度 与 任务 相互解耦,灵光指标公式源码从而提高系统整体的稳定性和拓展性。
任务调度指的是系统在约定的指定时间自动去执行指定的任务的过程。在开发项目时大家可能遇到过类似的场景问题,如系统需要定时在每天0点进行数据备份、活动开始前几小时预热执行一些前置业务、定时对 MQ 消息表的发送装填等。这些场景问题都可以通过任务调度 来解决。
单体系统 中有许多实现 任务调度 的方式,如多线程方式、Timer 类、Spring Tasks 等等。这里比较常用的北电离中戏源码是 Spring Tasks(通过 @EnableScheduling + @Scheduled 的注解可以自定义定时任务,有兴趣的可以去了解一下)。
在分布式系统下,每个服务都可以搭建为集群,这样的好处是可以将任务切片分给每一个服务从而实现并行执行,提高任务调度的处理效率。那么为什么分布式系统 不能使用 单体系统 的任务调度实现方式呢?在集群服务下,如果还是使用每台机器按照单体系统的任务调度实现方式实现的话,会出现下面这四个问题:怎么做到对任务的控制(如何避免任务重复执行)、如果某台机器宕机了,会不会存在任务丢失、如果要增加服务实例,怎么做到弹性扩容、爱德森源码如何做到对任务调度的执行情况统一监测。通过上面的问题可以了解到分布式系统下需要一个满足高可用、容错管理、负载均衡等功能的任务调度平台来实现任务调度。
xxl-job 分布式任务调度系统是一个开源软件,可以在 github 或 gitee 上查看和下载 xxl-job 的源码。在 docker 下安装 xxl-job、创建映射容器的文件目录、在/mydata/xxl-job 的目录下创建 application.properties 文件、导入 tables_xxl-job.sql 文件到指定的数据库、配置参数如数据库位置、访问口令等。绿线指标源码
在 Spring Boot 项目中,导入 xxl-job 的 maven 依赖,配置application.yml 文件指定调度中心地址、访问口令、执行器名称和端口等属性,编写配置类配置自定义任务和执行器,完成 SpringBoot 集成 xxl-job 实现分布式任务调度的全过程。
实战案例:当前项目需要对上传到分布式文件系统 minio 中的视频文件进行统一格式的视频转码操作。利用 xxl-job 的方式以任务调度的方式定时处理视频转码操作,以任务调度的方式,可以使得视频转码操作不会阻塞主线程,避免影响主要业务的吞吐量;以集群服务分片接收任务的方式,可以将任务均分给每个机器使得任务调度可以并行执行,提高总任务处理时间以及降低单台机器 CPU 的开销。
xxl-job 执行流程图:在集群部署时,配置路由策略中选择分片广播的方式,可以使一次任务调度会广播触发集群中所有的执行器执行一次任务,并且可以向系统传递分片参数。利用这一特性可以根据当前执行器的分片序号和分片总数来获取对应的任务记录。通过 Bean 模式(基于方法)获取分片序号和分片总数,编写 sql 获取任务记录,实现对集群服务均分任务的操作。
确保任务不会被重复消费:通过幂等性实现,依靠任务的状态(未处理1;处理中2;处理失败3;处理成功4)通过比较和设置的方式只有在状态为未处理或处理失败时才能设置为处理中,避免多个执行器同时处理该任务。设置调度过期策略和阻塞处理策略保证真正的幂等性。
编写完成所有任务:分片视频转码处理,通过分片广播拿到的参数以取模的方式获取当前执行器所属的任务记录集合,遍历集合并发执行任务,使用乐观锁抢占当前任务,执行任务过程包含分布式文件系统下载、视频转码、上传转码后的视频、更新任务状态(处理成功),使用 JUC 工具类 CountDownLatch 实现所有任务执行完后才退出方法,中间使用 xxl-job 的日志记录错误信息和执行结果。
清理任务表中转码成功的任务的记录并将其插入任务历史表,视频补偿机制处理任务超时情况下的任务,做出补偿,处理失败次数大于3次的任务,做出补偿。测试并查看日志,准备好的任务表记录,启动三台媒资服务器,并开启任务,可以单独查看每个任务的日志,通过日志中的执行日志查看具体日志信息,可以看到直接为了测试改错的路径导致下载视频出错,查看数据库表的变化,核心的视频转码任务执行成功,并且逻辑正确,能够起到分布式任务调度的作用。
Springboot项目整合xxl -job
搭建并启动xxl-job服务:
前往github下载源码,选择与springboot版本匹配的分支,执行相关SQL至数据库。若在创建xxl_job_registry表时遇到长度限制错误,需调整索引长度或替换。
配置数据库连接信息至application.properties文件,确保指定服务端口与上下文名称。
启动项目,或打包成jar文件。
访问后台管理页面,地址为/post/
JobSchedulerç使ç¨ååç
JobScheduler主è¦ç¨äºå¨æªæ¥æ个æ¶é´ä¸æ»¡è¶³ä¸å®æ¡ä»¶æ¶è§¦åæ§è¡æ项任å¡çæ åµï¼æ¶åçæ¡ä»¶å¯ä»¥æ¯ç½ç»ãçµéãæ¶é´çï¼ä¾å¦æ§è¡ç¹å®çç½ç»ãæ¯å¦åªå¨å çµæ¶æ§è¡ä»»å¡çãJobSchedulerç±»è´è´£å°åºç¨éè¦æ§è¡çä»»å¡åéç»æ¡æ¶ï¼ä»¥å¤å¯¹è¯¥åºç¨Jobçè°åº¦ï¼æ¯ä¸ä¸ªç³»ç»æå¡ï¼å¯ä»¥éè¿å¦ä¸æ¹å¼è·åï¼
JobInfoæ¯ä¼ éç»JobSchedulerç±»çæ°æ®å®¹å¨ï¼å®å°è£ äºé对è°ç¨åºç¨ç¨åºè°åº¦ä»»å¡æéçåç§çº¦æï¼ä¹å¯ä»¥è®¤ä¸ºä¸ä¸ªJobInfo对象对åºä¸ä¸ªä»»å¡ï¼JobInfo对象éè¿JobInfo.Builderå建ãå®å°ä½ä¸ºåæ°ä¼ éç»JobSchedulerï¼
JobInfo.Builderæ¯JobInfoçä¸ä¸ªå é¨ç±»ï¼ç¨æ¥å建JobInfoçBuilderç±»ã
JobServiceæ¯JobScheduleræç»åè°ç端ç¹ï¼JobSchedulerå°ä¼åè°è¯¥ç±»ä¸çonStartJob()å¼å§æ§è¡å¼æ¥ä»»å¡ãå®æ¯ä¸ä¸ªç»§æ¿äºJobServiceçæ½è±¡ç±»ï¼å为系ç»åè°æ§è¡ä»»å¡å 容çç»ç«¯ï¼JobScheduleræ¡æ¶å°éè¿bindService()æ¹å¼æ¥å¯å¨è¯¥æå¡ãå æ¤ï¼ç¨æ·å¿ é¡»å¨åºç¨ç¨åºä¸å建ä¸ä¸ªJobServiceçåç±»ï¼å¹¶å®ç°å ¶onStartJob()çåè°æ¹æ³ï¼ä»¥åå¨AndroidManifest.xmlä¸å¯¹å®æäºå¦ä¸æéï¼
注æå¨AndroidManifest.xmlä¸æ·»å æé
å½ä»»å¡å¼å§æ¶ä¼æ§è¡onStartJob(JobParameters params)æ¹æ³ï¼å¦æè¿åå¼æ¯falseï¼åç³»ç»è®¤ä¸ºè¿ä¸ªæ¹æ³è¿åæ¶ï¼ä»»å¡å·²ç»æ§è¡å®æ¯ãå¦æè¿åå¼æ¯trueï¼é£ä¹ç³»ç»è®¤ä¸ºè¿ä¸ªä»»å¡æ£è¦è¢«æ§è¡ï¼æ§è¡ä»»å¡çéæ å°±è½å¨äºä½ çè©ä¸ãå½ä»»å¡æ§è¡å®æ¯æ¶ä½ éè¦è°ç¨jobFinished(JobParameters params, boolean needsRescheduled)æ¥éç¥ç³»ç»ã
å½ç³»ç»æ¥æ¶å°ä¸ä¸ªåæ¶è¯·æ±æ¶ï¼ç³»ç»ä¼è°ç¨onStopJob(JobParameters params)æ¹æ³åæ¶æ£å¨çå¾ æ§è¡çä»»å¡ãå¾éè¦çä¸ç¹æ¯å¦æonStartJob(JobParameters params)è¿åfalseï¼é£ä¹ç³»ç»åå®å¨æ¥æ¶å°ä¸ä¸ªåæ¶è¯·æ±æ¶å·²ç»æ²¡ææ£å¨è¿è¡çä»»å¡ãæ¢å¥è¯è¯´ï¼onStopJob(JobParameters params)å¨è¿ç§æ åµä¸ä¸ä¼è¢«è°ç¨ã
éè¦æ³¨æçæ¯è¿ä¸ªJob Serviceè¿è¡å¨ä¸»çº¿ç¨ï¼è¿æå³çä½ éè¦ä½¿ç¨å线ç¨ï¼handlerï¼æè ä¸ä¸ªå¼æ¥ä»»å¡æ¥è¿è¡èæ¶çæä½ä»¥é²æ¢é»å¡ä¸»çº¿ç¨ã
Googleå®æ¹çSampleï¼ /googlearchive/android-JobScheduler
JobScheduleræ¯ä¸ä¸ªæ½è±¡ç±»ï¼å®å¨ç³»ç»æ¡æ¶çå®ç°ç±»æ¯android.app.JobSchedulerImpl
æ§è¡çå ¥å£æ¯JobScheduler.schedulerï¼å ¶å®æ¯è°äºJobSchedulerImplä¸çscheduleæ¹æ³ï¼ç¶ååè°äºmBinder.schedule(job)ãè¿ä¸ªmBinderå°±æ¯JobSchedulerServiceï¼éè¿Binderè·¨è¿ç¨è°ç¨JobSchedulerServiceã
æåè°ç¨å°JobSchedulerServiceä¸çscheduleæ¹æ³:
æ¥çåéMSG_CHECK_JOBæ¶æ¯ï¼æ¶æ¯å¤ççå°æ¹æ¯
æ¥çæ§è¡JobHandlerä¸ç maybeRunPendingJobsH æ¹æ³ï¼å¤çç¸åºçä»»å¡
availableContextæ¯JobServiceContextï¼å³ServiceConnectionï¼è¿ä¸ªæ¯è¿ç¨é´é讯ServiceConnectionï¼éè¿è°ç¨availableContext.executeRunnableJob(nextPending)æ¹æ³ï¼ä¼è§¦åè°ç¨onServiceConnectedï¼çå°è¿éåºè¯¥æç½äºï¼onServiceConnectedæ¹æ³ä¸çserviceå°±æ¯Jobserviceï¼éé¢è¿ç¨äºWakeLockéï¼é²æ¢ææºä¼ç ã
æ¥çï¼éè¿Handleråæ¶æ¯ï¼è°ç¨äºhandleServiceBoundH()æ¹æ³ã
ä»ä¸é¢æºç å¯ä»¥çåºï¼æç»æ¯è§¦åè°ç¨äºJobServiceä¸çstartJobæ¹æ³ã
ä»æºç çï¼è®¾ç½®çå 容åºç¨äº JobStatus ï¼ä¾å¦ç½ç»éå¶
èå¨JobSchedulerServiceç±»ï¼ç¸å ³çç¶ææ§å¶å¨å ¶æé å½æ°é:
ä¾å¦ç½ç»æ§å¶ç±»ConnectivityControllerç±»
å½ç½ç»åçæ¹åæ¶ï¼ä¼è°ç¨updateTrackedJobs(userid)æ¹æ³ï¼å¨updateTrackedJobsæ¹æ³ä¸ï¼ä¼å¤æç½ç»æ¯å¦ææ¹åï¼ææ¹åçä¼è°mStateChangedListener.onControllerStateChanged()æ¹æ³ï¼ç¶åè°ç¨äºJobSchedulerServiceç±»ä¸onControllerStateChangedæ¹æ³ï¼
æ¥çä¹æ¯å¤çMSG_CHECK_JOB æ¶æ¯ï¼åä¸æä¸æ ·ï¼æç»è§¦åè°ç¨äºJobServiceä¸çstartJobæ¹æ³ã
JobSchedulerServiceæ¯ä¸ä¸ªç³»ç»æå¡ï¼å³åºè¯¥å¨SystemServerå¯å¨çãé 读SystemServerçæºç ï¼
run æ¹æ³å¦ä¸ï¼
æ¥çç startOtherServices()
å æ¤ï¼å¨è¿éå°±å¯å¨äºJobSchedulerServiceæå¡ã
1. android æ§è½ä¼åJobScheduler使ç¨åæºç åæ
2. Android 9.0 JobScheduler(ä¸) JobSchedulerç使ç¨
3. Android 9.0 JobScheduler(äº) JobScheduleræ¡æ¶ç»æç®è¿°åJobSchedulerServiceçå¯å¨
4. Android 9.0 JobScheduler(ä¸) ä»Jobçå建å°æ§è¡
5. Android 9.0 JobScheduler(å) Job约ææ¡ä»¶çæ§å¶
6. ç解JobScheduleræºå¶
2024-12-23 06:34
2024-12-23 06:26
2024-12-23 06:03
2024-12-23 05:56
2024-12-23 04:52