1.一文带你搞懂xxl-job(分布式任务调度平台)
2.七天杀上GitHub榜首!Java并发编程深度解析实战,JUC底层原理揭秘
3.JUC架构-ReentrantLock的公平与非公平
4.美团动态线程池思路框架(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇
5.Go 并发实战--sync Cond
6.深入学习CAS底层原理
一文带你搞懂xxl-job(分布式任务调度平台)
本篇文章主要记录项目中遇到的 xxl-job 的实战,希望能通过这篇文章告诉读者们什么是p2p 借贷 源码 xxl-job 以及怎么使用 xxl-job 并分享一个实战案例。
xxl-job 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。设计思想是将调度行为抽象形成 调度中心 平台,平台本身不承担业务逻辑,而是负责发起 调度请求 后,由 执行器 接收调度请求并执行 任务,这里的 任务 抽象为 分散的 JobHandler。通过这种方式即可实现 调度 与 任务 相互解耦,从而提高系统整体的稳定性和拓展性。
任务调度指的是系统在约定的指定时间自动去执行指定的任务的过程。在开发项目时大家可能遇到过类似的场景问题,如系统需要定时在每天0点进行数据备份、活动开始前几小时预热执行一些前置业务、定时对 MQ 消息表的发送装填等。这些场景问题都可以通过任务调度 来解决。
单体系统 中有许多实现 任务调度 的方式,如多线程方式、Timer 类、Spring Tasks 等等。这里比较常用的是 Spring Tasks(通过 @EnableScheduling + @Scheduled 的注解可以自定义定时任务,有兴趣的可以去了解一下)。
在分布式系统下,每个服务都可以搭建为集群,这样的好处是可以将任务切片分给每一个服务从而实现并行执行,提高任务调度的处理效率。那么为什么分布式系统 不能使用 单体系统 的任务调度实现方式呢?在集群服务下,如果还是时钟源码分析使用每台机器按照单体系统的任务调度实现方式实现的话,会出现下面这四个问题:怎么做到对任务的控制(如何避免任务重复执行)、如果某台机器宕机了,会不会存在任务丢失、如果要增加服务实例,怎么做到弹性扩容、如何做到对任务调度的执行情况统一监测。通过上面的问题可以了解到分布式系统下需要一个满足高可用、容错管理、负载均衡等功能的任务调度平台来实现任务调度。
xxl-job 分布式任务调度系统是一个开源软件,可以在 github 或 gitee 上查看和下载 xxl-job 的源码。在 docker 下安装 xxl-job、创建映射容器的文件目录、在/mydata/xxl-job 的目录下创建 application.properties 文件、导入 tables_xxl-job.sql 文件到指定的数据库、配置参数如数据库位置、访问口令等。
在 Spring Boot 项目中,导入 xxl-job 的 maven 依赖,配置application.yml 文件指定调度中心地址、访问口令、执行器名称和端口等属性,编写配置类配置自定义任务和执行器,完成 SpringBoot 集成 xxl-job 实现分布式任务调度的全过程。
实战案例:当前项目需要对上传到分布式文件系统 minio 中的视频文件进行统一格式的视频转码操作。利用 xxl-job 的方式以任务调度的方式定时处理视频转码操作,以任务调度的方式,可以使得视频转码操作不会阻塞主线程,避免影响主要业务的吞吐量;以集群服务分片接收任务的方式,可以将任务均分给每个机器使得任务调度可以并行执行,提高总任务处理时间以及降低单台机器 CPU 的开销。
xxl-job 执行流程图:在集群部署时,配置路由策略中选择分片广播的方式,可以使一次任务调度会广播触发集群中所有的执行器执行一次任务,并且可以向系统传递分片参数。溯源码lamer利用这一特性可以根据当前执行器的分片序号和分片总数来获取对应的任务记录。通过 Bean 模式(基于方法)获取分片序号和分片总数,编写 sql 获取任务记录,实现对集群服务均分任务的操作。
确保任务不会被重复消费:通过幂等性实现,依靠任务的状态(未处理1;处理中2;处理失败3;处理成功4)通过比较和设置的方式只有在状态为未处理或处理失败时才能设置为处理中,避免多个执行器同时处理该任务。设置调度过期策略和阻塞处理策略保证真正的幂等性。
编写完成所有任务:分片视频转码处理,通过分片广播拿到的参数以取模的方式获取当前执行器所属的任务记录集合,遍历集合并发执行任务,使用乐观锁抢占当前任务,执行任务过程包含分布式文件系统下载、视频转码、上传转码后的视频、更新任务状态(处理成功),使用 JUC 工具类 CountDownLatch 实现所有任务执行完后才退出方法,中间使用 xxl-job 的日志记录错误信息和执行结果。
清理任务表中转码成功的任务的记录并将其插入任务历史表,视频补偿机制处理任务超时情况下的任务,做出补偿,处理失败次数大于3次的任务,做出补偿。测试并查看日志,准备好的任务表记录,启动三台媒资服务器,并开启任务,可以单独查看每个任务的日志,通过日志中的执行日志查看具体日志信息,可以看到直接为了测试改错的路径导致下载视频出错,查看数据库表的变化,核心的视频转码任务执行成功,并且逻辑正确,能够起到分布式任务调度的作用。
七天杀上GitHub榜首!跳茅坑源码Java并发编程深度解析实战,JUC底层原理揭秘
在多核CPU和多线程技术普及的当今,我们面对的不再是多年前对于线程开启时机的问题。如今,无论是开发人员还是技术开发者,都需要深入了解多线程技术的方方面面。本文将从操作系统原理的角度,全面解析多线程技术,涵盖基础知识到高级进阶,分享作者多年的工作经验和踩坑后的教训。
多线程编程技术已经成为现代软件开发不可或缺的部分。然而,对于很多开发者来说,尽管有各种库和运行环境对操作系统多线程接口的封装,他们仍然面对着复杂的多线程逻辑,甚至只是简单调用库的“业务”程序员。本文旨在从基础出发,深入浅出地讲解多线程技术的各个层面。
本文分为章,从Java线程的实践及原理揭秘开始,逐步深入到synchronized实现原理、volatile解决可见性和有序性问题、J.U.C中的重入锁和读写锁、线程通信中的条件等待机制、J.U.C并发工具集实战、并发编程必备工具、阻塞队列设计原理及实现、并发安全集合原理及源码、线程池设计原理、以及Java并发编程中的异步编程特性。每一章节都基于作者的经验总结和踩坑后的教训,为读者提供全面而深入的指导。
如果您对这份手册感兴趣并希望深入学习,欢迎您点赞并关注。获取完整内容的山姆溯源码方式非常简单,只需点击下方链接即可。让我们一起探索多线程技术的奥秘,提升编程技能,迈向技术的高峰。
JUC架构-ReentrantLock的公平与非公平
ReentrantLock是Java中一种强大的可重入锁,提供了与synchronized不同的线程同步机制。它的重要特性包括:
1. 可重入性:允许线程多次获取同一锁,但需要精确管理锁的获取和释放,以防止死锁。
2. 公平与非公平选择:ReentrantLock有两种模式:公平锁和非公平锁。公平锁按申请顺序分配锁,确保每个线程按等待时间获取,可能增加性能开销;非公平锁则优先给当前持有锁的线程,提高效率,但可能导致线程饥饿。
3. 显式控制:与synchronized不同,ReentrantLock需要显式调用lock()获取锁和unlock()释放锁,适合更精细的同步控制。
4. 条件变量:ReentrantLock支持Condition接口,实现复杂的线程协作和等待通知机制。
5. 可中断的获取:lockInterruptibly()方法允许在获取锁时响应中断,避免阻塞过久。
6. 保证可见性:与synchronized一样,ReentrantLock确保线程可见性,避免数据同步问题。
在选择ReentrantLock时,需注意平衡公平性、效率和线程饥饿的风险。默认情况下,它采用非公平模式,但可以为特定场景调整为公平锁。深入了解其公平与非公平的实现细节,可以参考AQS源码和Condition的讲解。
美团动态线程池思路框架(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇
动态线程池框架(DynamicTp)的adapter模块,作为第三方组件线程池管理的适配器,旨在使如Tomcat、Jetty和Undertow等Web服务器内置的线程池具备动态参数调整、监控告警等增强功能。通过该模块,用户可利用Spring的事件机制监听并管理这些第三方组件的线程池,实现与核心模块的解耦。
adapter模块已成功接入SpringBoot内置的三大WebServer,包括Tomcat、Jetty和Undertow的线程池管理。通过监听机制,动态Tp框架能够及时响应这些组件的线程池变化,提供实时监控和灵活调整策略。
具体实现上,针对Tomcat、Jetty和Undertow的线程池管理,需要深入理解其内部处理流程。这些组件并未直接使用Java Util Concurrency(JUC)提供的线程池实现,而是自定义了线程池或扩展了JUC的实现,如Tomcat就采用了自定义的ThreadPoolExecutor类,通过继承或扩展JUC的抽象类来定制线程池行为。
以Tomcat为例,其内部线程池的实现中,继承自JUC原生ThreadPoolExecutor或其抽象类AbstractExecutorService。在执行任务时,Tomcat首先调用父类方法处理,然后根据任务队列类型(如TaskQueue)和线程池当前状态(如线程数、提交任务数、队列状态)进行一系列复杂判断,以决定是否创建新线程、添加任务至队列或执行拒绝策略。这种设计使得Tomcat能够高效管理请求,同时优化资源利用,避免过度创建线程导致的性能下降。
Jetty和Undertow的内部线程池实现原理与Tomcat类似,均基于JUC框架进行定制,以满足其特定的性能优化和扩展需求。通过分析这些组件的源码,可以深入了解其线程池管理策略,为后续性能调优提供宝贵信息。
动态线程池框架(DynamicTp)的引入,为Web服务器性能调优提供了强大的工具,允许用户动态调整线程池参数,提升系统响应速度和资源利用率。使用DynamicTp框架,用户可以更灵活地管理第三方组件的线程池,实现业务与开源贡献的双赢。
欢迎使用DynamicTp框架,探索更多性能优化的可能性。下期将分享在使用过程中遇到的Tomcat版本不一致导致的监控线程停滞问题,通过这一案例深入理解ScheduledExecutorService的运行机制。敬请期待。
如需交流或合作,请联系我,期待与您一起成长:
微信:yanhom
公众号:CodeFox
Go 并发实战--sync Cond
在 Go 语言中,sync.Cond 是一个条件同步变量,它与 Java 中的 Object 的 wait、notify、notifyAll 方法或 Condition 类功能相似。掌握 Java 的 JUC 包实现后,学习其他语言的并发特性及工具变得相对简单。
sync.Cond 提供了阻塞和唤醒函数:Wait()、Signal() 和 Broadcast()。使用时,需要指定一把锁,如 Mutex、RWLock 或自定义实现的锁。下面展示一个简单的使用示例:
输出:
注:在启动 Go 协程后,它并不会立即执行,需要等待分配过程和一定时间。因此,建议在调用 Wait() 之前稍作等待,以免唤醒通知在 wait 之前发生而变得无效。上述示例展示了 Cond 的基础使用,生产环境中可能更为复杂,但大致原理相同。
Cond 的实现相对简单,核心依赖于创建的锁操作和 runtime 提供的阻塞和唤醒 Go 协程的函数。在 Wait 函数中,锁的使用方式可能不够直观,其主要目的是使外部条件判断与添加阻塞队列的操作变为原子操作。这种锁的使用方式需要谨慎,容易引发问题。
在醒操作部分,同步通知机制确保了当条件满足时,等待的协程能够被正确唤醒。关于 Cond 的使用和源码实现的详细内容,这里不再赘述。
深入学习CAS底层原理
什么是CAS
CAS是Compare-And-Swap的缩写,意思为比较并交换。以AtomicInteger为例,其提供了compareAndSet(intexpect,intupdate)方法,expect为期望值(被修改的值在主内存中的期望值),update为修改后的值。compareAndSet方法返回值类型为布尔类型,修改成功则返回true,修改失败返回false。
举个compareAndSet方法的例子:
publicclassAtomticIntegerTest{ publicstaticvoidmain(String[]args){ AtomicIntegeratomicInteger=newAtomicInteger(0);booleanresult=atomicInteger.compareAndSet(0,1);System.out.println(result);System.out.println(atomicInteger.get());}}上面例子中,通过AtomicInteger(intinitialValue)构造方法指定了AtomicInteger类成员变量value的初始值为0:
publicclassAtomicIntegerextendsNumberimplementsjava.io.Serializable{ ......privatevolatileintvalue;/***CreatesanewAtomicIntegerwiththegiveninitialvalue.**@paraminitialValuetheinitialvalue*/publicAtomicInteger(intinitialValue){ value=initialValue;}......}接着执行compareAndSet方法,main线程从主内存中拷贝了value的副本到工作线程,值为0,并将这个值修改为1。如果此时主内存中value的值还是为0的话(言外之意就是没有被其他线程修改过),则将修改后的副本值刷回主内存更新value的值。所以上面的例子运行结果应该是true和1:
将上面的例子修改为:
publicclassAtomticIntegerTest{ publicstaticvoidmain(String[]args){ AtomicIntegeratomicInteger=newAtomicInteger(0);booleanfirstResult=atomicInteger.compareAndSet(0,1);booleansecondResult=atomicInteger.compareAndSet(0,1);System.out.println(firstResult);System.out.println(secondResult);System.out.println(atomicInteger.get());}}上面例子中,main线程第二次调用compareAndSet方法的时候,value的值已经被修改为1了,不符合其expect的值,所以修改将失败。上面例子输出如下:
CAS底层原理查看compareAndSet方法源码:
/***Atomicallysetsthevalueto{ @codenewValue}*ifthecurrentvalue{ @code==expectedValue},*withmemoryeffectsasspecifiedby{ @linkVarHandle#compareAndSet}.**@paramexpectedValuetheexpectedvalue*@paramnewValuethenewvalue*@return{ @codetrue}ifsuccessful.Falsereturnindicatesthat*theactualvaluewasnotequaltotheexpectedvalue.*/publicfinalbooleancompareAndSet(intexpectedValue,intnewValue){ returnU.compareAndSetInt(this,VALUE,expectedValue,newValue);}该方法通过调用unsafe类的compareAndSwapInt方法实现相关功能。compareAndSwapInt方法包含四个参数:
this,当前对象;
valueOffset,value成员变量的内存偏移量(也就是内存地址):
privatestaticfinallongvalueOffset;static{ try{ valueOffset=unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));}catch(Exceptionex){ thrownewError(ex);}}expect,期待值;
update,更新值。
所以这个方法的含义为:获取当前对象value成员变量在主内存中的值,和传入的期待值相比,如果相等则说明这个值没有被别的线程修改过,然后将其修改为更新值。
那么unsafe又是什么?它的compareAndSwapInt方法是原子性的么?查看该方法的源码:
/***AtomicallyupdatesJavavariableto{ @codex}ifitiscurrently*holding{ @codeexpected}.**<p>Thisoperationhasmemorysemanticsofa{ @codevolatile}read*andwrite.CorrespondstoCatomic_compare_exchange_strong.**@return{ @codetrue}ifsuccessful*/@HotSpotIntrinsicCandidatepublicfinalnativebooleancompareAndSetInt(Objecto,longoffset,intexpected,intx);该方法并没有具体Java代码实现,方法通过native关键字修饰。由于Java方法无法直接访问底层系统,Unsafe类相当于一个后门,可以通过该类的方法直接操作特定内存的数据。Unsafe类存在于sun.msic包中,JVM会帮我们实现出相应的汇编指令。Unsafe类中的CAS方法是一条CPU并发原语,由若干条指令组成,用于完成某个功能的一个过程。原语的执行必须是连续的,在执行过程中不允许被中断,不会存在数据不一致的问题。
getAndIncrement方法剖析了解了CAS原理后,我们回头看下AtomicInteger的getAndIncrement方法源码:
/***Atomicallyincrementsthecurrentvalue,*withmemoryeffectsasspecifiedby{ @linkVarHandle#getAndAdd}.**<p>Equivalentto{ @codegetAndAdd(1)}.**@returnthepreviousvalue*/publicfinalintgetAndIncrement(){ returnU.getAndAddInt(this,VALUE,1);}该方法通过调用unsafe类的getAndAddInt方法实现相关功能。继续查看getAndAddInt方法的源码:
/***Atomicallyaddsthegivenvaluetothecurrentvalueofafield*orarrayelementwithinthegivenobject{ @codeo}*atthegiven{ @codeoffset}.**@paramoobject/arraytoupdatethefield/elementin*@paramoffsetfield/elementoffset*@paramdeltathevaluetoadd*@returnthepreviousvalue*@since1.8*/@HotSpotIntrinsicCandidatepublicfinalintgetAndAddInt(Objecto,longoffset,intdelta){ intv;do{ v=getIntVolatile(o,offset);}while(!weakCompareAndSetInt(o,offset,v,v+delta));returnv;}结合源码,我们便可以很直观地看出为什么AtomicInteger的getAndIncrement方法是线程安全的了:
o是AtomicInteger对象本身;offset是AtomicInteger对象的成员变量value的内存地址;delta是需要变更的数量;v是通过unsafe的getIntVolatile方法获得AtomicInteger对象的成员变量value在主内存中的值。dowhile循环中的逻辑为:用当前对象的值和var5比较,如果相同,说明该值没有被别的线程修改过,更新为v+delta,并返回true(CAS);否则继续获取值并比较,直到更新完成。
CAS的缺点CAS并不是完美的,其存在以下这些缺点:
如果刚好while里的CAS操作一直不成功,那么对CPU的开销大;
只能确保一个共享变量的原子操作;
存在ABA问题。
CAS实现的一个重要前提是需要取出某一时刻的数据并在当下时刻比较交换,这之间的时间差会导致数据的变化。比如:thread1线程从主内存中取出了变量a的值为A,thread2页从主内存中取出了变量a的值为A。由于线程调度的不确定性,这时候thread1可能被短暂挂起了,thread2进行了一些操作将值修改为了B,然后又进行了一些操作将值修改回了A,这时候当thread1重新获取CPU时间片重新执行CAS操作时,会发现变量a在主内存中的值仍然是A,所以CAS操作成功。
解决ABA问题那么如何解决CAS的ABA问题呢?由上面的阐述课件,光通过判断值是否相等并不能确保在一定时间差内值没有变更过,所以我们需要一个额外的指标来辅助判断,类似于时间戳,版本号等。
JUC为我们提供了一个AtomicStampedReference类,通过查看它的构造方法就可以看出,除了指定初始值外,还需指定一个版本号(戳):
/***Createsanew{ @codeAtomicStampedReference}withthegiven*initialvalues.**@paraminitialReftheinitialreference*@paraminitialStamptheinitialstamp*/publicAtomicStampedReference(VinitialRef,intinitialStamp){ pair=Pair.of(initialRef,initialStamp);}我们就用这个类来解决ABA问题,首先模拟一个ABA问题场景:
publicclassAtomticIntegerTest{ publicstaticvoidmain(String[]args){ AtomicReference<String>atomicReference=newAtomicReference<>("A");newThread(()->{ //模拟一次ABA操作atomicReference.compareAndSet("A","B");atomicReference.compareAndSet("B","A");System.out.println(Thread.currentThread().getName()+"线程完成了一次ABA操作");},"thread1").start();newThread(()->{ //让thread2先睡眠2秒钟,确保thread1的ABA操作完成try{ TimeUnit.SECONDS.sleep(2);}catch(InterruptedExceptione){ e.printStackTrace();}booleanresult=atomicReference.compareAndSet("A","B");if(result){ System.out.println(Thread.currentThread().getName()+"线程修改值成功,当前值为:"+atomicReference.get());}},"thread2").start();}}运行程序,输出如下:
使用AtomicStampedReference解决ABA问题:
publicclassAtomicIntegerextendsNumberimplementsjava.io.Serializable{ ......privatevolatileintvalue;/***CreatesanewAtomicIntegerwiththegiveninitialvalue.**@paraminitialValuetheinitialvalue*/publicAtomicInteger(intinitialValue){ value=initialValue;}......}0程序输出如下: