1.编程「锁」事|详解乐观锁 CAS 的技术原理
2.Java 中的 CAS
3.java并发原子类AtomicBoolean解析
4.线程池执行过程中遇到异常会发生什么?怎样处理?
编程「锁」事|详解乐观锁 CAS 的技术原理
本文深入探讨乐观锁的核心实现方式——CAS(Compare And Swap)技术原理。CAS是一种在多线程环境下实现同步功能的机制,相较于悲观锁的加锁操作,CAS允许在不使用锁的情况下实现多线程间的变量同步。Java的并发包中的原子类正是利用CAS实现乐观锁。
CAS操作包含三个操作数:需要更新的java 工作流源码内存值V、进行比较的预期数值A和要写入的值B。其逻辑是将内存值V与预期值A进行比较,当且仅当V值等于A时,通过原子方式用新值B更新V值(“比较+更新”整体是一个原子操作),否则不执行任何操作。一般情况下,更新操作会不断重试直至成功。
以Java.util.concurrent.atomic并发包下的AtomicInteger原子整型类为例,分析其CAS底层实现机制。方法`atomicData.incrementAndGet()`内部通过Unsafe类实现。Unsafe类是底层硬件CPU指令复制工具类,关键在于compareAndSet()方法的返回结果。
`unsafe.compareAndSwapInt(this, valueOffset, expect, update)`
此方法中,参数`this`是Unsafe对象本身,用于获取value的集合jdk源码分析内存偏移地址。`valueOffset`是value变量的内存偏移地址,`expect`是期望更新的值,`update`是要更新的最新值。如果原子变量中的value值等于`expect`,则使用`update`值更新该值并返回true,否则返回false。
至于`valueOffset`的来源,这里提到value实际上是volatile关键字修饰的变量,以保证在多线程环境下的内存可见性。
CAS的底层是Unsafe类。如何通过`Unsafe.getUnsafe()`方法获得Unsafe类的实例?这是因为AtomicInteger类在rt.jar包下,因此通过Bootstrap根类加载器加载。Unsafe类的具体实现可以在hotspot源码中找到,而unsafe.cpp中的C++代码不在本文详细分析范围内。对CAS实现感兴趣的读者可以自行查阅。
CAS底层的Unsafe类在多处理器上运行时,为cmpxchg指令添加lock前缀(lock cmpxchg),在单处理器上则无需此步骤(单处理器自身维护单处理器内的顺序一致性)。这一机制确保了CAS操作的原子性。
最后,dll静态注入源码同学们会发现CAS的操作与原子性密切相关。CPU如何实现原子性操作是一个深入的话题,有机会可以继续探索。欢迎在评论区讨论,避免出现BUG!点赞转发不脱发!
Java 中的 CAS
正文内容:CAS 在 Java 中的应用与实现
一、CAS 原理
CAS,全称 Compare And Swap,中文译为“比较并交换”。其核心操作涉及三个步骤:比较内存中的原数据V与旧的预期值A,如果相等,则将新值B写入V,同时返回操作成功信号。在并发环境下,多个线程同时操作,仅允许一个线程成功操作,但不会阻塞其他线程,实现了一种乐观锁机制。
二、RDD.map源码CAS 实现
在 Java 中,CAS 操作主要通过 Unsafe 类实现。Unsafe 类基于 Java 类与包可见性的漏洞,提供了一种不安全的实现方式,以实现高速操作。通过 Native 方法 compareAndSwapInt,CAS 操作被调用,最终实现于 JVM 的底层代码中。
三、CAS 应用
1. 自旋锁:在锁操作中,线程不断循环等待,直到 CAS 操作成功,实现非阻塞锁机制。
2. AtomicInteger 的 incrementAndGet():在原子整数操作中,通过不断循环执行 CAS 操作,实现原子递增。
3. 令牌桶限流器:通过 CAS 保证多线程环境下对 token 的安全增加和分发,防止并发冲突。
四、总结与应用
CAS 思想贯穿计算机底层实现和编程语言设计,控制的真人源码从微观层面实现原子操作,到宏观层面应用于分布式系统中的锁机制。其应用广泛,不仅在多线程编程中用于实现非阻塞操作,还能在分布式系统中利用 Redis 等外部存储实现分布式锁。
五、CAS 缺点与局限性
尽管 CAS 提供了高效的并发控制机制,但也存在一些局限性,如频繁的自旋操作可能导致 CPU 使用率过高,以及在极端情况下可能出现无限循环问题。因此,在实际应用中,开发者需根据具体场景和需求,合理选择并发控制策略。
六、学习建议与思考
深入理解 CAS 的原理与实现,有助于提升多线程编程能力,理解并发控制的底层机制。同时,结合具体应用场景分析 CAS 的优缺点,能够更好地选择合适的并发控制策略,避免过度依赖新技术。在追求技术进步的同时,反思基础与经典的价值,往往能获得更大的技术提升。
java并发原子类AtomicBoolean解析
本文针对Java并发包下的原子类AtomicBoolean进行深入解析。在多线程环境中,传统的布尔变量`boolean`并非线程安全,容易导致数据竞争问题。为解决这一问题,引入了AtomicBoolean类,该类提供了一种线程安全的布尔值封装。
使用`AtomicBoolean`的主要原因在于其提供的原子操作保证了多线程环境下的线程安全。在`AtomicBoolean`内部实现中,主要依赖于`compareAndSet`方法和CAS(Compare and Swap)机制。通过CAS操作,`AtomicBoolean`能够在多线程环境下实现原子的更新操作,有效避免了数据竞争和并发问题。
在`AtomicBoolean`的源码中,`compareAndSet`方法使用了`Unsafe`类的`compareAndSwapInt`方法进行底层操作。CAS机制的核心思想是:在不进行锁操作的情况下,检查指定内存位置的预期值是否与当前值相等,若相等,则更新该位置的值为预期值;若不相等,则操作失败,返回原值。
为了理解这一机制,我们可以通过一个简单例子进行说明。假设我们希望在多线程环境下实现一个“先来后到”的规则,例如:一个人完成起床、上班和下班三件事后,另一个人才能开始。在单线程下,这一逻辑自然无问题,但在多线程环境下,`AtomicBoolean`可以确保这一顺序得到实现。
在实际应用中,`AtomicBoolean`类提供了丰富的原子操作方法,包括但不限于`compareAndSet`、`getAndSet`、`compareAndExchange`等。这些方法允许开发人员在多线程环境下安全地执行原子操作,简化了多线程编程的复杂性。
总结而言,`AtomicBoolean`是一个在Java并发编程中非常实用的工具类,它通过原子操作保证了多线程环境下的线程安全。对于开发者而言,掌握`AtomicBoolean`的使用方法和原理,可以有效避免数据竞争问题,提升程序的并发性能和稳定性。
线程池执行过程中遇到异常会发生什么?怎样处理?
线程遇到未处理的异常就结束了
这个好理解,当线程出现未捕获异常的时候就执行不下去了,留给它的就是垃圾回收了。
线程池中线程频繁出现未捕获异常当线程池中线程频繁出现未捕获的异常,那线程的复用率就大大降低了,需要不断地创建新线程。
做个实验:
publicclassThreadExecutor{ privateThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}threadPoolExecutor.execute(()->{ intj=1/0;});});}}新建一个只有一个线程的线程池,每隔0.1s提交一个任务,任务中是一个1/0的计算。
Exceptioninthread"customThread0"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread1"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread2"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread3"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread4"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread5"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)可见每次执行的线程都不一样,之前的线程都没有复用。原因是因为出现了未捕获的异常。
我们把异常捕获试试:
publicclassThreadExecutor{ privateThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}threadPoolExecutor.execute(()->{ try{ intj=1/0;}catch(Exceptione){ System.out.println(Thread.currentThread().getName()+""+e.getMessage());}});});}}customThread0/byzerocustomThread0/byzerocustomThread0/byzerocustomThread0/byzerocustomThread0/byzero可见当异常捕获了,线程就可以复用了。
问题来了,我们的代码中异常不可能全部捕获如果要捕获那些没被业务代码捕获的异常,可以设置Thread类的uncaughtExceptionHandler属性。这时使用ThreadFactoryBuilder会比较方便,ThreadFactoryBuilder是guava提供的ThreadFactory生成器。
newThreadFactoryBuilder().setNameFormat("customThread%d").setUncaughtExceptionHandler((t,e)->System.out.println(t.getName()+"发生异常"+e.getCause())).build()修改之后:
publicclassThreadExecutor{ privatestaticThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").setUncaughtExceptionHandler((t,e)->System.out.println("UncaughtExceptionHandler捕获到:"+t.getName()+"发生异常"+e.getMessage())).build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}threadPoolExecutor.execute(()->{ System.out.println("线程"+Thread.currentThread().getName()+"执行");intj=1/0;});});}}线程customThread0执行UncaughtExceptionHandler捕获到:customThread0发生异常/byzero线程customThread1执行UncaughtExceptionHandler捕获到:customThread1发生异常/byzero线程customThread2执行UncaughtExceptionHandler捕获到:customThread2发生异常/byzero线程customThread3执行UncaughtExceptionHandler捕获到:customThread3发生异常/byzero线程customThread4执行UncaughtExceptionHandler捕获到:customThread4发生异常/byzero可见,结果并不是我们想象的那样,线程池中原有的线程没有复用!所以通过UncaughtExceptionHandler想将异常吞掉使线程复用这招貌似行不通。它只是做了一层异常的保底处理。
将excute改成submit试试
publicclassThreadExecutor{ privatestaticThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").setUncaughtExceptionHandler((t,e)->System.out.println("UncaughtExceptionHandler捕获到:"+t.getName()+"发生异常"+e.getMessage())).build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}Future<?>future=threadPoolExecutor.submit(()->{ System.out.println("线程"+Thread.currentThread().getName()+"执行");intj=1/0;});try{ future.get();}catch(InterruptedExceptione){ e.printStackTrace();}catch(ExecutionExceptione){ e.printStackTrace();}});}}线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero通过submit提交线程可以屏蔽线程中产生的异常,达到线程复用。当get()执行结果时异常才会抛出。
原因是通过submit提交的线程,当发生异常时,会将异常保存,待future.get();时才会抛出。
这是Futuretask的部分run()方法,看setException:
publicvoidrun(){ try{ Callable<V>c=callable;if(c!=null&&state==NEW){ Vresult;booleanran;try{ result=c.call();ran=true;}catch(Throwableex){ result=null;ran=false;setException(ex);}if(ran)set(result);}}}protectedvoidsetException(Throwablet){ if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){ outcome=t;UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL);//finalstatefinishCompletion();}}将异常存在outcome对象中,没有抛出,再看get方法:
Exceptioninthread"customThread0"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread1"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread2"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread3"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread4"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread5"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)0当outcome是异常时才抛出。
总结1、线程池中线程中异常尽量手动捕获
2、通过设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过execute提交任务,线程依然会中断,而通过submit提交任务,可以获取线程执行结果,线程异常会在get执行结果时抛出。
原文链接:/weixin_/article/details/