多线程
volatile
//TODO
Synchronized
//TODO
ReentrantLock
ReentrantLock是API级别的锁,实现了Lock接口。通过方法lock()获取锁,unlock()释放锁。先通过一个简单的例子看看ReentrantLock的用法。
public class LockExp1 {
private static int count=0;
private Lock lock=new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
LockExp1 lockexp=new LockExp1();
ExecutorService executor= Executors.newCachedThreadPool();
for (int i=0;i<1000;i++){
executor.execute(()->lockexp.add());
}
Thread.sleep(2000);
System.out.println(count);
executor.shutdown();
}
public void add(){
lock.lock();
try{
count++;
}finally{
lock.unlock();
}
}
}lock的实现
上面代码中lock是通过ReentrantLock类向上转型创建的实例对象,所以lock对象调用lock方法要去ReentrantLock中看源码,如下:
public void lock() {
sync.lock();
}sync是Sync类创建的对象,Sync是ReentrantLock类中的一个内部类,继承自AbstractQueuedSynchronizer。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}Sync类中的lock方法如下:
abstract void lock();
此方法在Sync中是抽象方法,所以继续找实现了Sync中lock方法的子类(先看非公平锁的实现):NonfairSync
static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}if语句中compareAndSetState方法其实就是调用了cas将AbstractQueuedSynchronizer类中的state从0改为1.
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}修改成功则将当前线程为独占线程。我们主要看else中的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}tryAcquire方法在NonfairSync中实现了,实际上是调用了Sync中的nonfairTryAcquire方法:
//tryAcquire在NonfairSync中
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//nonfairTryAcquire在Sync中
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程和状态
final Thread current = Thread.currentThread();
int c = getState();
//若状态为0,则尝试将状态设置为1,设置成功则将当前线程设为独占线程,返回true
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//若当前线程就是独占线程,状态值再次+1(acquires为1),返回true。其实就是实现可重入。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//其他情况返回false
return false;
}回过头看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,先看其中的addWaiter(Node.EXCLUSIVE),
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) {
//创建一个Node记录当前线程,把Node的模式设置为EXCLUSIVE(独占)。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//队列不为空时,将node节点尾接至队列中,返回node
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队列为空则调用enq初始化队列.
enq(node);
return node;
}
//初始化队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}再回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中,看看acquireQueued做了什么:
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//将此节点设置为head,并将原head的next设为null,方便GC。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;//返回等待过程中是否被中断
}
//如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}现在看看shouldParkAfterFailedAcquire(p, node)做了什么:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后
* 就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下(保证下次返回true)。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}回到if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,如果线程找好安全休息点后,那就可以安心去休息了。调用parkAndCheckInterrupt:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
1)被unpark();
2)被interrupt()。
OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),现在让我们再回到
acquireQueued(),总结下该函数的具体流程:
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
acquireQueued()分析完之后,我们接下来再回到acquire(),总结下它的流程吧:
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
以上就是ReentrantLock中lock方法的实现,接下来看看unlock。
unlock的实现
首先还是看ReentrantLock中unlock函数源码
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//新状态值c设置为原状态值-1(releases=1)
//如果当前线程不是独占线程,抛出异常IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;//返回值free标记是否完全释放,即c=0
if (c == 0) {
free = true
setExclusiveOwnerThread(null);//独占线程设置为null
}
setState(c);//将c设为新状态值
return free;//返回free
}再回到release方法的unparkSuccessor:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//拿到node的状态
if (ws < 0)
//状态有效则尝试把node状态设为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//取node后续节点s
//得到node的有效后续节点
if (s == null || s.waitStatus > 0) {
s = null;
//从尾部向前遍历,直到node,当t的状态有效,则赋值给s,最终得到的s节点时离node最近的有效节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//调用unpark唤醒
}这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head &&tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!
//TODO
查看19道真题和解析