1.ReentrantLock源码详细解析
2.AbstractQueuedSynchronizer详解
3.kfifo(linux kernel 无锁队列)
4.java中的源码非公平锁不怕有的线程一直得不到执行吗
5.老生常谈线程基础的几个问题
6.fsIO调度算法之NOOP
ReentrantLock源码详细解析
在深入解析ReentrantLock源码之前,我们先了解ReentrantLock与同步机制的源码关系。ReentrantLock作为Java中引入的源码并发工具类,由Doug Lea编写,源码相较于synchronized关键字,源码它提供了更为灵活的源码源码木马扫描锁管理策略,支持公平与非公平锁两种模式。源码AQS(AbstractQueuedSynchronizer)作为实现锁和同步器的源码核心框架,由AQS类的源码独占线程、同步状态state、源码FIFO等待队列和UnSafe对象组成。源码AQS类的源码内部结构图显示了其组件的构成。在AQS框架下,源码等待队列采用双向链表实现,源码头结点存在但无线程,源码T1和T2节点中的线程可能在自旋获取锁后进入阻塞状态。
Node节点作为等待队列的基本单元,分为共享模式和独占模式,值得关注的是waitStatus成员变量,它包含五种状态:-3、-2、-1、0、1。本文重点讨论-1、0、1状态,-3状态将不涉及。非公平锁与公平锁的差异在于,非公平锁模式下新线程可直接尝试获取锁,而公平锁模式下新线程需排队等待。
ReentrantLock内部采用非公平同步器作为其同步器实现,构造函数中根据需要选择非公平同步器或公平同步器。ReentrantLock默认采用非公平锁策略。非公平锁与公平锁的区别在于获取锁的顺序,非公平锁允许新线程跳过等待队列,而公平锁严格遵循队列顺序。
在非公平同步器的实例中,我们以T1线程首次获取锁为例。T1成功获取锁后,将exclusiveOwnerThread设置为自身,state设置为1。紧接着,T2线程尝试获取锁,但由于state为1,获取失败。调用acquire方法尝试获得锁,尝试通过tryAcquire方法实现,非公平同步器的实现调用具体逻辑。
在非公平锁获取逻辑中,通过CAS操作尝试交换状态。tpshop 分销 源码交换成功后,设置独占线程。当当前线程为自身时,执行重入操作,叠加state状态。若获取锁失败,则T2和T3线程进入等待队列,调用addWaiter方法。队列初始化通过enq方法实现,enq方法中的循环逻辑确保线程被正确加入队尾。新线程T3调用addWaiter方法入队,队列初始化完成。
在此过程中,T2和T3线程开始自旋尝试获取锁。若失败,则调用parkAndCheckInterrupt()方法进入阻塞状态。在shouldParkAfterFailedAcquire方法中,当前驱节点等待状态为CANCELLED时,方法会找到第一个非取消状态的节点,并断开取消状态的前驱节点与该节点的连接。若T5线程加入等待队列,T3和T4线程因为自旋获取锁失败进入finally块调用取消方法,找到等待状态不为1的节点(即T2),断开连接。
理解了shouldParkAfterFailedAcquire方法后,我们关注acquireQueued方法的实现。该方法确保线程在队列中正确释放,如果队列的节点前驱为head节点,成功获取锁后,调用setHead方法释放线程。setHead方法通过CAS操作更新head节点,释放线程。acquire方法中的阻塞是为防止线程在唤醒后重新尝试获取锁而进行的额外阻断。
锁的释放过程相对简单,将state减至0,将exclusiveOwnerThread设置为null,完成锁的释放。通过上述解析,我们深入理解了ReentrantLock的锁获取、等待、释放等核心机制,为并发编程提供了强大的工具支持。
AbstractQueuedSynchronizer详解
AbstractQueuedSynchronizer,简称AQS,究竟是什么呢?
它提供了一个框架,用于实现基于先进先出(FIFO)等待队列的阻塞锁及相关同步器(如信号量、事件等)。这个类的设计目的是为了成为那些依赖单一原子整数值来表示状态的同步器的有效基础。
从JDK的注释来看,AQS是仙剑源码 移植一个实现阻塞锁和相关同步器的框架,它基于先进先出的等待队列。这个类适用于大多数使用整数来表示状态的同步器。这里提到的框架实际上是一个类。
那么,在什么情况下我们会使用AQS呢?
在编程中,我们经常遇到并发问题。有了并发,就涉及资源共享;而资源共享又需要处理资源的同步访问。在处理同步时,我们需要解决竞争发生时的等待问题和竞争解除后的唤醒问题。AQS就是这样一个便于实现这种同步机制的框架。我们日常使用的ReentrantLock、ReentrantReadWriteLock以及ArrayBlockingQueue等都是基于AQS实现的。
如果没有AQS,又会如何呢?
在没有AQS的情况下,要实现锁需要怎么做呢?这些问题都是我们在实现锁时需要考虑的。
在AQS的情况下,情况又是怎样的呢?
AQS作为基础类,主要解决了在锁不可用时的等待,以及锁释放后的唤醒。锁状态的设定、如何获取锁以及如何释放锁,都是需要相应的同步机制自己实现的。因此,在使用AQS时,需要实现以下方法:
类似于ReentrantLock,使用state来标识锁的状态,state = 0表示锁未被获取,当state > 0表示锁已被获取。此外,实现了AQS的tryAcquire()和tryRelease()来处理state的状态,以处理锁的状态。这样就可以实现基础的ReentrantLock。因此,在AQS的支持下,实现类似的阻塞锁非常方便。
以上我们介绍了AQS是什么,以及AQS的具体使用场景,下面我们详细介绍AQS的具体实现机制。
我们先简单看一下AQS的核心流程。
接下来,我们将结合ReentrantLock来详细查看基于AQS如何实现排他锁。
首先,我们来看一下ReentrantLock是如何使用int类型的state来表示锁的状态的。state = 0表示锁未被获取。state > 0表示锁已被持有。由于是可重入锁,state表示了这个锁被同一个线程获取了多少次。
上面的代码片段是ReentrantLock的核心代码,为了便于解释,iapp导出源码省略了其他代码。首先我们可以看到,在使用AQS时,通常不是直接实现AQS,而是创建一个内部的辅助类。
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class
然后加锁和释放锁对应的lock和unlock,直接调用辅助类的lock和unlock。
下面我们来看一下Sync类的代码。
上面我们来看一下非公平锁NonfairSync的实现。
下面我们来看一下AQS中的acquire方法。
上面我们看了获取锁的方法acquire,下面看下释放锁的release。
总结一下,AQS通过tryAcquire以及tryRelease两个方法,来进行锁的获取以及释放,两个都是非阻塞的方法。如果获取成功则返回,如果获取失败,则添加队列。在队列中获取锁,获取失败则挂起,等待锁被释放后被重新唤醒。唤醒后还是会去尝试获取锁。释放锁的时候如果检查到已经全部释放,则会唤醒被挂起的线程。这样通过tryAcquire和tryRelease,实现了锁的获取以及等待,以及锁的释放。具体锁状态的控制,子类则是通过tryAcquire和tryRelease进行控制的。
在介绍ReentrantLock时,我们简化了很多代码。了解基本原理后,再去读源码会事半功倍。
kfifo(linux kernel 无锁队列)
队列作为常见数据结构,其主要特点是先进先出(FIFO)。FIFO用于缓冲通信速度不匹配场景,例如生产者快速生成数据,消费者则难以及时处理。在通信接口驱动中,数据被暂存于队列中,驱动程序可以立即返回接收新数据或等待,而解析程序仅需从队列中获取数据。FIFO也可解决“多生产者-单消费者”问题。
kfifo是Linux内核中实现队列功能的一种无锁环形队列。无锁意味着在单生产者单消费者场景下无需加锁操作。它通过使用in和out两个变量作为入队和出队索引来实现无锁操作,避免了在多个生产者或消费者场景下的加锁需求。通过将in/out与fifo的空间大小减一进行“与”操作,kfifo实现了比取余操作更快的wireshark 查看源码队列循环。这意味着当in和out超过队列大小时,它们会继续向前加,而不是简单地减去队列大小后重新开始。
kfifo实现“无锁”特性仅针对“单生产者-单消费者”场景。对于“多生产者”或“多消费者”情况,需对入队或出队操作进行加锁以避免数据竞争。
使用kfifo有两种方式:动态申请和静态定义。动态申请包括包含头文件、定义结构体、申请内存、执行入队和出队操作,最后释放内存。静态定义则在定义fifo变量时使用宏,操作函数更加简洁,无需内存管理步骤。
kfifo结构体包含用于管理队列的变量,如in、out、mask等。内存申请过程确保了用mask大小掩码与in/out索引进行“与”操作,实现队列循环,避免了取余运算的开销。使用kfifo_alloc动态申请内存时,最终分配的内存空间是向上取2的次方,以支持mask大小掩码操作。这可能导致使用者在不了解规则的情况下踩坑,例如,申请字节内存时实际上分配了字节,可能导致数据错位。
入队操作涉及确定内存地址、拷贝数据并确保内存屏障以避免乱序异常。整个过程高效且简洁。对于更深入的了解,可查阅kfifo的源码。
java中的非公平锁不怕有的线程一直得不到执行吗
首先来看公平锁和非公平锁,我们默认使用的锁是非公平锁,只有当我们显示设置为公平锁的情况下,才会使用公平锁,下面我们简单看一下公平锁的源码,如果等待队列中没有节点在等待,则占有锁,如果已经存在等待节点,则返回失败,由后面的程序去将此线程加入等待队列通过上面的代码,我们可以推断,当使用公平锁的情况下,并且同一个线程的执行时间较长时,线程内部进行了多次的锁的获取和释放,效率非常低下,可以参加Lesson8中的demo:
demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的情况下个线程循环下单数为:
demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的情况下个线程循环下单数为:
demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的情况下个线程循环下单数为:
demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的情况下个线程循环下单数为:
上面的demo中,在使用公平锁的情况下性能明显降低,非公平锁的性能是公平锁性能的几十倍以上,这和公平锁每次试图占有锁时,都必须先要进等待队列,按照FIFO的顺序去获取锁,因此在我们的实验情景下,使用公平锁的线程进行了频繁切换,而频繁切换线程,性能必然会下降的厉害,这也告诫了我们在实际的开发过程中,在需要使用公平锁的情景下,务必要考虑线程的切换频率。
接下来我们来看一下读写锁,通过看读写锁的实现源码,我们可以发现,读锁和写锁共用同一个等待队列,那么在采用非公平锁的情况下,如果读锁的线程执行时间比较长,并且读锁的并发比较高,那么写锁的线程便永远都拿不到锁,那么实际的情况会不会是这样呢?
demo Lesson3WriteReadLock:此demo的读线程在不断的占用读锁,按照推论,写锁的线程是没有机会获取到锁的,但是实际情况是写锁的线程可以正常的获取到锁,那么是什么原因使得写锁的线程可以获取到锁的了?通过查看源代码,会发现有这样的一个方法:
上面的方法,实现了一个新的读线程获取锁的中断,它会读取等待队列中下一个等待锁的线程,如果它是获取写锁的线程,那么此方法返回为真,调用它的程序会把这个试图获取读锁的线程加入到等待队列,从而终止了读线程一直都在占有锁的情况。
老生常谈线程基础的几个问题
实现线程只有一种方式
我们知道启动线程至少可以通过以下四种方式:
实现Runnable接口
继承Thread类
线程池创建线程
带返回值的Callable创建线程
但是看它们的底层就一种方式,就是通过newThread()实现,其他的只不过在它的上面做了层封装。
实现Runnable接口要比继承Thread类的更好:
结构上分工更明确,线程本身属性和任务逻辑解耦。
某些情况下性能更好,直接把任务交给线程池执行,无需再次newThread()。
可拓展性更好:实现接口可以多个,而继承只能单继承。
有的时候可能会问到启动线程为什么是start()方法,而不是run()方法,这个问题很简单,执行run()方法其实就是在执行一个类的普通方法,并没有启动一个线程,而start()方法点进去看是一个native方法。
当我们在执行java中的start()方法的时候,它的底层会调JVM由c++编写的代码Thread::start,然后c++代码再调操作系统的create_thread创建线程,创建完线程以后并不会马上运行,要等待CPU的调度。CPU的调度算法有很多,比如先来先服务调度算法(FIFO),最短优先(就是对短作业的优先调度)、时间片轮转调度等。如下图所示:
线程的状态在Java中线程的生命周期中一共有6种状态。
NEW:初始状态,线程被构建,但是还没有调用start方法
RUNNABLE:运行状态,JAVA线程把操作系统中的就绪和运行两种状态统一称为运行中
BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了CPU使用权
WAITING:等待状态
TIMED_WAITING:超时等待状态,超时以后自动返回
TERMINATED:终止状态,表示当前线程执行完毕
当然这也不是我说的,源码中就是这么定义的:
publicenumState{ /***Threadstateforathreadwhichhasnotyetstarted.*/NEW,/***Threadstateforarunnablethread.Athreadintherunnable*stateisexecutingintheJavavirtualmachinebutitmay*bewaitingforotherresourcesfromtheoperatingsystem*suchasprocessor.*/RUNNABLE,/***Threadstateforathreadblockedwaitingforamonitorlock.*Athreadintheblockedstateiswaitingforamonitorlock*toenterasynchronizedblock/methodor*reenterasynchronizedblock/methodaftercalling*{ @linkObject#wait()Object.wait}.*/BLOCKED,/***Threadstateforawaitingthread.*Athreadisinthewaitingstateduetocallingoneofthe*followingmethods:*<ul>*<li>{ @linkObject#wait()Object.wait}withnotimeout</li>*<li>{ @link#join()Thread.join}withnotimeout</li>*<li>{ @linkLockSupport#park()LockSupport.park}</li>*</ul>**<p>Athreadinthewaitingstateiswaitingforanotherthreadto*performaparticularaction.**Forexample,athreadthathascalled<tt>Object.wait()</tt>*onanobjectiswaitingforanotherthreadtocall*<tt>Object.notify()</tt>or<tt>Object.notifyAll()</tt>on*thatobject.Athreadthathascalled<tt>Thread.join()</tt>*iswaitingforaspecifiedthreadtoterminate.*/WAITING,/***Threadstateforawaitingthreadwithaspecifiedwaitingtime.*Athreadisinthetimedwaitingstateduetocallingoneof*thefollowingmethodswithaspecifiedpositivewaitingtime:*<ul>*<li>{ @link#sleepThread.sleep}</li>*<li>{ @linkObject#wait(long)Object.wait}withtimeout</li>*<li>{ @link#join(long)Thread.join}withtimeout</li>*<li>{ @linkLockSupport#parkNanosLockSupport.parkNanos}</li>*<li>{ @linkLockSupport#parkUntilLockSupport.parkUntil}</li>*</ul>*/TIMED_WAITING,/***Threadstateforaterminatedthread.*Thethreadhascompletedexecution.*/TERMINATED;}下面是这六种状态的转换:
New新创建New表示线程被创建但尚未启动的状态:当我们用newThread()新建一个线程时,如果线程没有开始调用start()方法,那么此时它的状态就是New。而一旦线程调用了start(),它的状态就会从New变成Runnable。
Runnable运行状态Java中的Runable状态对应操作系统线程状态中的两种状态,分别是Running和Ready,也就是说,Java中处于Runnable状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配CPU资源。
如果一个正在运行的线程是Runnable状态,当它运行到任务的一半时,执行该线程的CPU被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是Runnable,因为它有可能随时被调度回来继续执行任务。
在Java中Blocked、Waiting、TimedWaiting,这三种状态统称为阻塞状态,下面分别来看下。
Blocked从上图可以看出,从Runnable状态进入Blocked状态只有一种可能,就是进入synchronized保护的代码时没有抢到monitor锁,jvm会把当前的线程放入到锁池中。当处于Blocked的线程抢到monitor锁,就会从Blocked状态回到Runnable状态。
Waiting状态我们看上图,线程进入Waiting状态有三种可能。
没有设置Timeout参数的Object.wait()方法,jvm会把当前线程放入到等待队列。
没有设置Timeout参数的Thread.join()方法。
LockSupport.park()方法。
Blocked与Waiting的区别是Blocked在等待其他线程释放monitor锁,而Waiting则是在等待某个条件,比如join的线程执行完毕,或者是notify()/notifyAll()。
当执行了LockSupport.unpark(),或者join的线程运行结束,或者被中断时可以进入Runnable状态。当调用notify()或notifyAll()来唤醒它,它会直接进入Blocked状态,因为唤醒Waiting状态的线程能够调用notify()或notifyAll(),肯定是已经持有了monitor锁,这时候处于Waiting状态的线程没有拿到monitor锁,就会进入Blocked状态,直到执行了notify()/notifyAll()唤醒它的线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从Blocked状态回到Runnable状态。
TimedWaiting状态在Waiting上面是TimedWaiting状态,这两个状态是非常相似的,区别仅在于有没有时间限制,TimedWaiting会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。
以下情况会让线程进入TimedWaiting状态。
设置了时间参数的Thread.sleep(longmillis)方法。
设置了时间参数的Object.wait(longtimeout)方法。
设置了时间参数的Thread.join(longmillis)方法。
设置了时间参数的LockSupport.parkNanos(longnanos)。
LockSupport.parkUntil(longdeadline)方法。
在TimedWaiting中执行notify()和notifyAll()也是一样的道理,它们会先进入Blocked状态,然后抢夺锁成功后,再回到Runnable状态。当然,如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable状态,而无需经历Blocked状态。
Terminated终止Terminated终止状态,要想进入这个状态有两种可能。
run()方法执行完毕,线程正常退出。
出现一个没有捕获的异常,终止了run()方法,最终导致意外终止。
线程的停止interrupt我们知道Thread提供了线程的一些操作方法,比如stop(),suspend()和resume(),这些方法已经被Java直接标记为@Deprecated,这就说明这些方法是不建议大家使用的。
因为stop()会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题。这种行为类似于在linux系统中执行kill-9类似,它是一种不安全的操作。
而对于suspend()和resume()而言,它们的问题在于如果线程调用suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被resume()之前,是不会被释放的。
interrupt最正确的停止线程的方式是使用interrupt,但interrupt仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。
下面我们来看下例子:
publicclassInterruptExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ System.out.println(Thread.currentThread().getName()+"--");}}}执行一下,运行了一会就停止了
主线程在调用t1的interrupt()之后,这个线程的中断标记位就会被设置成true。每个线程都有这样的标记位,当线程执行时,会定期检查这个标记位,如果标记位被设置成true,就说明有程序想终止该线程。在while循环体判断语句中,通过Thread.currentThread().isInterrupt()判断线程是否被中断,如果被置为true了,则跳出循环,线程就结束了,这个就是interrupt的简单用法。
阻塞状态下的线程中断下面来看第二个例子,在循环中加了Thread.sleep秒。
publicclassInterruptSleepExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptSleepExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();}System.out.println(Thread.currentThread().getName()+"--");}}}再来看下运行结果,卡主了,并没有停止。这是因为main线程调用了t1.interrupt(),此时t1正在sleep中,这时候是接收不到中断信号的,要sleep结束以后才能收到。这样的中断太不及时了,我让你中断了,你缺还在傻傻的sleep中。
Java开发的设计者已经考虑到了这一点,sleep、wait等方法可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个InterruptedException异常,同时清除中断信号,将中断标记位设置成false。
这时候有几种做法:
直接捕获异常,不做处理,e.printStackTrace();打印下信息
将异常往外抛出,即在方法上throwsInterruptedException
再次中断,代码如下,加上Thread.currentThread().interrupt();
@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();//把中断标记修改为trueThread.currentThread().interrupt();}System.out.println(Thread.currentThread().getName()+"--");}}这时候线程感受到了,我们人为的再把中断标记修改为true,线程就能停止了。一般情况下我们操作线程很少会用到interrupt,因为大多数情况下我们用的是线程池,线程池已经帮我封装好了,但是这方面的知识还是需要掌握的。感谢收看,多多点赞~
作者:小杰博士
fsIO调度算法之NOOP
NOOP,全称为No Operation,即电梯式调度算法。在Linux2.4或更早版本的系统中,它是唯一的I/O调度算法。NOOP实现了一个简单的FIFO队列,其运作原理类似于电梯的工作方式,将新来的I/O请求合并到最近的请求之后,从而保证了请求在同一介质上的连续性。NOOP倾向于优先处理写请求,对读请求较为不利。在闪存设备、RAM和嵌入式系统中,NOOP表现最佳。
电梯算法导致读请求“饿死”的原因在于,写请求比读请求更容易处理。写请求通过文件系统缓存,无需等待一次写操作完成即可开始下一次写操作。写请求可以通过合并和堆积在I/O队列中。而读请求需要在前面的所有读操作完成后才能进行下一次读操作。在读操作之间存在几毫秒的等待时间,而在此期间,新的写请求到来,导致后续的读请求“饿死”。
在Linux 3.0版本中,对NOOP调度器进行了优化和改进。
在I/O调度器NOOP中,请求的处理流程如下:
1. 向前合并请求:`noop_merged_requests`。
参考资料:IO调度器NOOP与deadline源码级分析 - hiyachen - ChinaUnix博客