Concurrent Synchronizer Framework

同步框架AQS

作为同步基础框架,需要提供:维护一个同步状态(表示锁被锁定或释放),包含对状态的更新和检查操作,包含能够阻塞当前线程的方法,其他线程修改同步状态后能恢复。

在AQS中Lock和Condition的组合,约等于Synchronized和wait/notify的组合。

conditiong的作用是维护等待信号队列,并在适当的时机加入到AQS等待队列中以实现唤醒。

AQS需满足的三要素

  • 自动维护同步状态
  • 阻塞和唤醒线程
  • 维护队列

Synchronization State,是32位的int,使用volatile保证state在线程间可见,并通过CAS操作完成更新,保证变量的同步。compareAndSetState与期望值对比,相同才更新。

Blocking,提供独占(ReentrantLock)和共享模式(Semaphore),以及同时包含两种模式的ReentrantReadWriteLock。

Queue,框架的核心就是维护的这个包含阻塞线程的队列。队列的结构可以参考:

Doug Lea, The java.util.concurrent Synchronizer Framework, Science of Computer Programming, 2005, 58(3):293-309

AQS应用

一般是声明私有内部类去继承AQS,并代理AQS的全部或者部分方法。后面将会看到ReentrantLock,闭锁、信号量、屏障、读写锁,都是这样实现的。

Lock

提供和synchronized关键字类似的同步功能,只是使用的时候需要显示地获取和释放锁。但是可以解决synchronized关键字不太容易解决的内置锁交叉的情况。同时,具备尝试非阻塞地获取锁;能被中断地获取锁,超时获取锁。

如:     lock A
    lock B
    unlock A
    lock C
    unlock B
    unlock D
    ......

Lock的使用方式:

Lock lock = new ReentrantLock();
lock.lock();
try {
} finally {
    lock.unlock();  // 要在finally中释放锁,保证出现异常也能释放

}

队列同步器AbstractQueuedSynchronizer(AQS)

使用int成员变量表示同步状态,FIFO队列存储排队的线程。子类通过继承AQS实现它的抽象方法,来管理同步状态,通过getState,setState,compareAndSetState(使用CAS设置当前状态)操作同步状态。

既支持独占式地获取同步状态,也支持共享式地获取同步状态。

同步器提供的模板方法基本分为三类:独占式获取与释放同步状态、共享式获取与释放和查询同步队列中的等待下农村的情况。

同步队列

FIFO双向队列,当线程获取同步状态失败时,会构造node节点并添加到队列尾并阻塞当前线程,当同步状态释放的时候,会把队首节点的线程唤醒,使其再次尝试获取同步状态。

Node包含,获取同步状态失败线程的引用、等待状态以及前驱和后继节点。

加入队列的过程是一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update)

独占式

获取独占式排他锁,无法中断,获取失败进入同步队列。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&        // 线程安全的获取同步状态,失败后构造同步节点
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 并通过addWaiter添加到队列尾部
        // acquireQueued自旋方式获取同步状态
        selfInterrupt();
}
构造的节点是Node.exclusive这种独占式的,同一时刻只能有一个线程成功获取同步状态

tryAcquire由具体的实现类实现,如ReentrantLock中公平锁和非公平锁的实现。

公平锁

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 公平锁需要获取队首节点
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 重入
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平锁

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 非公平锁,直接尝试获取状态
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

节点添加到队列尾部,这里存在疑问compareAndSetTail和下文的enq是否重复,另外compareAndSetTail的主时钟明确注明Used only by enq

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// enq是死循环,设置成功才能返回,可以保证所有添加节点的线程串行化
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

最后是acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 死循环,自旋锁,节点自旋获取同步状态
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 前驱是头节点才会获取同步状态
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里头节点是成功获取到同步状态的节点,头节点释放同步状态后,才能唤醒后继节点。head最初会指向刚刚获取同步状态的节点,后续获取同步状态失败的节点会被添加到队列末尾。

释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点。

####共享式
同一时刻有多个线程同时获取到同步状态。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 共享锁就是检查如果分配arg这么多状态后,剩余状态量的值是否还大于0.所以不满足条件,就自旋
        doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

与独占式不同,共享模式释放需要保证线程安全,通过循环和CAS。代码请参考doReleaseShared。

独占式超时/中断获取同步状态

private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 前驱是头节点并尝试获取同步状态,与普通独占式相同
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            if (nanosTimeout <= 0) // 达到超时时间,自旋结束,退出
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 计算时间,即还应在等待自旋的时间
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

并发容器和框架

ConcurrentHashMap

是现场安全且高效的HashMap。

  1. 在并发环境下,使用HashMap可能导致死循环,HashMap在扩容的时候,会将原来的链表逆序扩容,如果连个线程,一个访问到了逆序扩容后的链表,一个访问的是原先的链表,那么e1->e2,e2->e1,就会造成死循环。
  2. 而HashTable使用Synchronized关键字加锁,粒度太大,效率低下,当一个线程访问同步方法,其他方法均会进入阻塞和轮询状态。
  3. ConcurrentHashMap使用锁分段技术,降低对同一把锁的竞争,提高性能。

ConcurrentLinkedQueue

线程安全的队列有两种方式:阻塞算法,队列维护一把锁或两把锁(入队、出队各一把),因锁竞争无法获取时,阻塞操作;另一种是非阻塞方式,使用循环CAS,ConcurrentLinkedQueue就是这样一种队列。

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,FIFO,队尾添加,队头获取。

http://cdn3.infoqstatic.com/statics_s1_20160614-0102u2/resource/articles/ConcurrentLinkedQueue/zh/resources/11.jpg

类图可以看出,队列包含head和tail节点,每个节点是一个node包含数据元素item和指向下一个node的next引用,通过next指向,所有node被连接成链表形式。默认情况下head为空,tail=head。

入队

入队主要做两件事情:将入队节点设置成为当前队列尾节点的下一个;更新tail节点指向新的尾节点,这里当tail为空的时候,tail节点是不移动的,而是将入队节点设置为tail的next;否则,入队节点设置为tail。因此tail节点不总是指向尾节点。所以tail节点的指向方式会呈现交替的情况,如下图所示。

public boolean offer(E e) {
    // 入队元素null判断
    checkNotNull(e);
    // 入队前,用形参创建节点
    final Node<E> newNode = new Node<E>(e);
    // 死循环
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // tail.next是空,通过cas在监测一次
            if (p.casNext(null, newNode)) {
                // 需要移动tail的情况,不需要则直接返回
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 
            p = (t != (t = tail)) ? t : head;
        else
            // 
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

1、队列为空,tail=head,head.next->null

循环条件赋值:p=t=tail, q=p.next <=> p.next=null && q=null

所以可以进入代码q==null的条件,将新节点加入到队尾,此时,队列情况如下:

t=tail=head, head.next=e1, p.next=e1, 但p还是等于t,所以无需移动tail

2、加入e2,p=t=tail=head, head.next=e1, 所以q=e1, p!=q

进入else的条件,由于p==t,所以p被赋值为q后,p=q进入第二次循环(p==q只有第一次添加后才会出现):

q=p.next,此时由于p已经等于q等于e1,且e1是最后一个元素,所以p.next==null,所以q==null,进入第一个条件,将e2加入队尾,此时队列情况如下:

t=tail=head->e1->e2, p=e1,因为p!=t,需要移动tail到e2的位置,此时tail.next==null,如再继续追加元素,将重新执行第一步中的情况,导致1,2交替执行。

之所以使用这种交替更新tail的方法,是因为,每次更新tail都需要使用cas,如果能减少cas更新tail的次数会提供性能,所以doug lea才在tail与队尾距离超过一个元素后才移动,也就是添加过程,交替移动tail的情况。

出队

出队操作与入队相似,只是这时候删除的是队首元素,同时移动head。同样为了提高性能,减少cas更新head的次数,当head与真正的队首元素距离超过一个null的时候,才移动head到当前真正的队首,否则,head还是指向null,head.next才是真正的队首。

阻塞队列

BlockingQueue是一个支持两个附加操作的队列,支持阻塞的插入和移除方法。

当队列满时:队列会阻塞插入元素的线程,直到队列不满。
当队列空时:队列会阻塞移除元素的线程,直到队列非空。

常用的场景是生产者消费者,生产者向队列生成添加元素,消费者从队列取出元素,阻塞队列就是生产者和消费者用来生成和消费元素的容器。

关于插入和删除操作的4种处理方法

方法 抛出异常 返回特殊值 一直阻塞 超时
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek()

jdk7中的7个阻塞队列

1. ArrayBlockingQueue    数组、有界
2. LinkedBolckingQueue    链表、有界
3. PriorityBlockingQueue    支持优先级排序、无界
4. DelayQueue    使用优先级队列实现、无界
5. SynchronousQueue    不存储元素
6. LinkedTransferQueue    链表、无界
7. LinedBlockingDeque 链表、双向

阻塞队列的实现原理

使用condition的awati和signal方法,通知模式实现。

Fork/Join框架

大任务分解成小任务,小任务执行结果汇总成大任务执行结果。