同步框架AQS
作为同步基础框架,需要提供:维护一个同步状态(表示锁被锁定或释放),包含对状态的更新和检查操作,包含能够阻塞当前线程的方法,其他线程修改同步状态后能恢复。
在AQS中Lock和Condition的组合,约等于Synchronized和wait/notify的组合。
conditiong的作用是维护等待信号队列,并在适当的时机加入到AQS等待队列中以实现唤醒。
AQS需满足的三要素
- 自动维护同步状态
- 阻塞和唤醒线程
- 维护队列
Synchronization State,是32位的int,使用volatile保证state在线程间可见,并通过CAS操作完成更新,保证变量的同步。compareAndSetState与期望值对比,相同才更新。
Blocking,提供独占(ReentrantLock)和共享模式(Semaphore),以及同时包含两种模式的ReentrantReadWriteLock。
Queue,框架的核心就是维护的这个包含阻塞线程的队列。队列的结构可以参考:
Doug Lea, The java.util.concurrent Synchronizer Framework, Science of Computer Programming, 2005, 58(3):293-309
AQS应用
一般是声明私有内部类去继承AQS,并代理AQS的全部或者部分方法。后面将会看到ReentrantLock,闭锁、信号量、屏障、读写锁,都是这样实现的。
锁
Lock
提供和synchronized关键字类似的同步功能,只是使用的时候需要显示地获取和释放锁。但是可以解决synchronized关键字不太容易解决的内置锁交叉的情况。同时,具备尝试非阻塞地获取锁;能被中断地获取锁,超时获取锁。
如: lock A
lock B
unlock A
lock C
unlock B
unlock D
......
Lock的使用方式:
Lock lock = new ReentrantLock();
lock.lock();
try {
} finally {
lock.unlock(); // 要在finally中释放锁,保证出现异常也能释放
}
队列同步器AbstractQueuedSynchronizer(AQS)
使用int成员变量表示同步状态,FIFO队列存储排队的线程。子类通过继承AQS实现它的抽象方法,来管理同步状态,通过getState,setState,compareAndSetState(使用CAS设置当前状态)操作同步状态。
既支持独占式地获取同步状态,也支持共享式地获取同步状态。
同步器提供的模板方法基本分为三类:独占式获取与释放同步状态、共享式获取与释放和查询同步队列中的等待下农村的情况。
同步队列
FIFO双向队列,当线程获取同步状态失败时,会构造node节点并添加到队列尾并阻塞当前线程,当同步状态释放的时候,会把队首节点的线程唤醒,使其再次尝试获取同步状态。
Node包含,获取同步状态失败线程的引用、等待状态以及前驱和后继节点。
加入队列的过程是一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update)
独占式
获取独占式排他锁,无法中断,获取失败进入同步队列。
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 线程安全的获取同步状态,失败后构造同步节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 并通过addWaiter添加到队列尾部
// acquireQueued自旋方式获取同步状态
selfInterrupt();
}
构造的节点是Node.exclusive这种独占式的,同一时刻只能有一个线程成功获取同步状态
tryAcquire由具体的实现类实现,如ReentrantLock中公平锁和非公平锁的实现。
公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 公平锁需要获取队首节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 非公平锁,直接尝试获取状态
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
节点添加到队列尾部,这里存在疑问compareAndSetTail和下文的enq是否重复,另外compareAndSetTail的主时钟明确注明Used only by enq
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// enq是死循环,设置成功才能返回,可以保证所有添加节点的线程串行化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
最后是acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 死循环,自旋锁,节点自旋获取同步状态
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驱是头节点才会获取同步状态
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里头节点是成功获取到同步状态的节点,头节点释放同步状态后,才能唤醒后继节点。head最初会指向刚刚获取同步状态的节点,后续获取同步状态失败的节点会被添加到队列末尾。
释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点。
####共享式
同一时刻有多个线程同时获取到同步状态。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 共享锁就是检查如果分配arg这么多状态后,剩余状态量的值是否还大于0.所以不满足条件,就自旋
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
与独占式不同,共享模式释放需要保证线程安全,通过循环和CAS。代码请参考doReleaseShared。
独占式超时/中断获取同步状态
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驱是头节点并尝试获取同步状态,与普通独占式相同
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0) // 达到超时时间,自旋结束,退出
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 计算时间,即还应在等待自旋的时间
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
并发容器和框架
ConcurrentHashMap
是现场安全且高效的HashMap。
- 在并发环境下,使用HashMap可能导致死循环,HashMap在扩容的时候,会将原来的链表逆序扩容,如果连个线程,一个访问到了逆序扩容后的链表,一个访问的是原先的链表,那么e1->e2,e2->e1,就会造成死循环。
- 而HashTable使用Synchronized关键字加锁,粒度太大,效率低下,当一个线程访问同步方法,其他方法均会进入阻塞和轮询状态。
- ConcurrentHashMap使用锁分段技术,降低对同一把锁的竞争,提高性能。
ConcurrentLinkedQueue
线程安全的队列有两种方式:阻塞算法,队列维护一把锁或两把锁(入队、出队各一把),因锁竞争无法获取时,阻塞操作;另一种是非阻塞方式,使用循环CAS,ConcurrentLinkedQueue就是这样一种队列。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,FIFO,队尾添加,队头获取。
类图可以看出,队列包含head和tail节点,每个节点是一个node包含数据元素item和指向下一个node的next引用,通过next指向,所有node被连接成链表形式。默认情况下head为空,tail=head。
入队
入队主要做两件事情:将入队节点设置成为当前队列尾节点的下一个;更新tail节点指向新的尾节点,这里当tail为空的时候,tail节点是不移动的,而是将入队节点设置为tail的next;否则,入队节点设置为tail。因此tail节点不总是指向尾节点。所以tail节点的指向方式会呈现交替的情况,如下图所示。
public boolean offer(E e) {
// 入队元素null判断
checkNotNull(e);
// 入队前,用形参创建节点
final Node<E> newNode = new Node<E>(e);
// 死循环
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// tail.next是空,通过cas在监测一次
if (p.casNext(null, newNode)) {
// 需要移动tail的情况,不需要则直接返回
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
//
p = (t != (t = tail)) ? t : head;
else
//
p = (p != t && t != (t = tail)) ? t : q;
}
}
1、队列为空,tail=head,head.next->null
循环条件赋值:p=t=tail, q=p.next <=> p.next=null && q=null
所以可以进入代码q==null的条件,将新节点加入到队尾,此时,队列情况如下:
t=tail=head, head.next=e1, p.next=e1, 但p还是等于t,所以无需移动tail
2、加入e2,p=t=tail=head, head.next=e1, 所以q=e1, p!=q
进入else的条件,由于p==t,所以p被赋值为q后,p=q进入第二次循环(p==q只有第一次添加后才会出现):
q=p.next,此时由于p已经等于q等于e1,且e1是最后一个元素,所以p.next==null,所以q==null,进入第一个条件,将e2加入队尾,此时队列情况如下:
t=tail=head->e1->e2, p=e1,因为p!=t,需要移动tail到e2的位置,此时tail.next==null,如再继续追加元素,将重新执行第一步中的情况,导致1,2交替执行。
之所以使用这种交替更新tail的方法,是因为,每次更新tail都需要使用cas,如果能减少cas更新tail的次数会提供性能,所以doug lea才在tail与队尾距离超过一个元素后才移动,也就是添加过程,交替移动tail的情况。
出队
出队操作与入队相似,只是这时候删除的是队首元素,同时移动head。同样为了提高性能,减少cas更新head的次数,当head与真正的队首元素距离超过一个null的时候,才移动head到当前真正的队首,否则,head还是指向null,head.next才是真正的队首。
阻塞队列
BlockingQueue是一个支持两个附加操作的队列,支持阻塞的插入和移除方法。
当队列满时:队列会阻塞插入元素的线程,直到队列不满。
当队列空时:队列会阻塞移除元素的线程,直到队列非空。
常用的场景是生产者消费者,生产者向队列生成添加元素,消费者从队列取出元素,阻塞队列就是生产者和消费者用来生成和消费元素的容器。
关于插入和删除操作的4种处理方法
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 无 | 无 |
jdk7中的7个阻塞队列
1. ArrayBlockingQueue 数组、有界
2. LinkedBolckingQueue 链表、有界
3. PriorityBlockingQueue 支持优先级排序、无界
4. DelayQueue 使用优先级队列实现、无界
5. SynchronousQueue 不存储元素
6. LinkedTransferQueue 链表、无界
7. LinedBlockingDeque 链表、双向
阻塞队列的实现原理
使用condition的awati和signal方法,通知模式实现。
Fork/Join框架
大任务分解成小任务,小任务执行结果汇总成大任务执行结果。