深入分析同步工具类之AbstractQueuedSynchronizer

时间:2021-04-28 17:36:40
 

 

概览:

   AQS(简称)依赖内部维护的一个FIFO(先进先出)队列,可以很好的实现阻塞、同步;volatile修饰的属性state,哪个线程先改变这个状态值,那么这个线程就获得了优先权,可以做任何事(当然这些事肯定是我们预先写好的需要执行的业务代码咯[坏笑]),而其他线程则会被挂起,直到之前的线程执行完才会轮到下一个。 类库中同步工具类(CountDownLatch[闭锁]、Semaphore[信号量])就是依靠内部维护一个类来更改这个state来实现其自身的特性(说到底还是执行某些线程、阻塞一些线程)

 深入分析同步工具类之AbstractQueuedSynchronizer

代码分析前:

   节点Node几个关键的属性:

              1、volatile Thread thread;当前节点代表的线程

              2、volatile Node next;当前结点的后继节点

              3、volatile Node prev;当前结点的前驱节点

              4、volatile int waitStatus;当前结点的等待状态  

                             1 CANCELLED(线程取消)

                             -1 SIGNAL(表示当前节点的后继节点需要运行)

                             -2 CONDITION(表示当前节点在等待condition,也就是在condition队列中)

                             -3 PROPAGATE (表示当前场景下后续的acquireShared能够得以执行)  

                             0(表示当前节点在sync队列中,等待着获取锁)

代码分析:

   

//获取状态
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
//该方法由子类去实现(改变state的值)
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
//为snyc队列添加节点
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
           //CAS更新尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
//判断有没有释放状态的线程并且尝试获取状态,或者将线程挂起
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //当前线程所在节点的前驱节点为头节点,说明前驱节点所代表的线程正在执行,这个时候尝试获取状态来验证前驱节点的线程有没有执行完,
                if (p == head && tryAcquire(arg)) {
                    setHead(node);//当前线程已获得状态,执行当前线程。将当前线程设置为头节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;//退出当前循环
                }
                //将当前线程挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//线程执行完后释放持有的状态
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
//子类实现(改变状态的值)
protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
//将头节点的后继节点的线程恢复,这样后继节点就可以继续执行之前的循环,获取状态。
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 参考:

    http://ifeve.com/introduce-abstractqueuedsynchronizer/

   http://www.jianshu.com/p/d8eeb31bee5c