Java并发-线程协作机制

本文最后更新于:2 年前

wait、notify

属于 Object 类中。

当条件不成立时,线程调用 wait 进入条件等待队列。

另一个线程修改了条件变量后调用 notify,调用 wait 的线程唤醒后需要重新检查条件变量。

显示条件

sychronized:wait,notify

reentranlock:显式条件-condition 接口:await,signal

线程中断

协作工具类

读写锁 reentrantreadwritelock

读读是并行的。

实现

image-20210803205028306

信号量 semaphore

信号量,用来控制同时访问特定资源的线程数量,它通过协调各个线程以保证合理的使用公共资源,可以用做流量控制,譬如数据库连接场景控制等;

Semaphore 的构造方法 Semaphore(int permits) 接收一个整型参数,表示可用的许可证数量,即最大并发数量,使用方法就是在线程里面首先调用 acquire 方法获取一个许可,使用完后接着调用 release 归还一个许可,还可以使用 tryAcquire 尝试获取许可

倒计时门栓 countdownlatch

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。

线程使用 countDown() 方法时,使用了 tryReleaseShared 方法以 CAS 的操作来减少 state,直至 state 为 0 。

当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

CountDownLatch 的两种典型用法

1、某一线程在开始运行前等待 n 个线程执行完毕。

将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

2、实现多个线程开始执行任务的最大并行性。

注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 (new CountDownLatch(1)),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。

CountDownLatch 的不足

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

循环栅栏 cyclicbarrier

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。

区别

image-20210803210950957

CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用

CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;

CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。

对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。

而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

阻塞队列

image-20210805141923139

线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素

当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。

当阻塞队列是满时,从队列中添加元素的操作将会被阻塞。

试图从空的阻塞队列中获取元素的线程将会阻塞,直到其他的线程往空的队列插入新的元素,同样,试图往已满的阻塞队列添加新元素的线程同样也会阻塞,直到其他的线程从列中移除一个或多个元素或者完全清空队列后继续新增。

为什么要用阻塞队列,有什么好处吗

在多线程领域:所谓阻塞,是指在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这些 BlockingQueue 都包办了。

在 concurrent 包发布以前,多线程环境下,我们每个程序员都必须自己去实现这些细节,尤其还要兼顾效率和线程安全,这会给我们的程序带来不小的复杂性。

JDK 提供了 7 个阻塞队列。分别是

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列
DelayQueue:一个使用优先级队列实现的无界阻塞队列
SynchronousQueue:一个不存储元素的阻塞队列
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列(实现了继承于 BlockingQueue 的 TransferQueue)
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

最佳实践

image-20210805142222849

future/futuretask