CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作,类似于Thread.join()方法,但功能比join的功能多。
join的原理是不停检查join线程是否存活,若果join线程存活,则让当前线程永远等待,当join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM中实现,JDK里是看不到的。
对于countDownLatch,其构造函数接收一个int类型的参数作为计数器,使用countDown方法时,计数器减1,直到计数器为0时,被await方法阻塞的线程不被阻塞。
package javaMultiThread;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println(3);
}
}
同步屏障CyclicBarrier
CyclicBarrier是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier自己已到达屏障,然后阻塞当前线程,若是最后一个线程到达,则使得之前被屏障的所有线程继续运行。
CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。
package javaMultiThread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
// TODO Auto-generated method stub
new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(2);
}
static class A implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(3);
}
}
}
控制并发线程的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景
package javaMultiThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoneTest {
static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
// TODO Auto-generated method stub
for(int i = 0; i < THREAD_COUNT; i++){
threadPool.execute(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
try {
s.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("save data");
s.release();
}
});
}
threadPool.shutdown();
}
}
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类,用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
package javaMultiThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Exchanger {
private static final java.util.concurrent.Exchanger<String> exgr = new java.util.concurrent.Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
// TODO Auto-generated method stub
threadPool.execute(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
String A = "银行流水A";
try {
String B = exgr.exchange(A);
System.out.println(A);
System.out.println(B);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
String B = "银行流水B";
try {
String A = exgr.exchange(B);
System.out.println(A.equals(B) + " " + A + " " + B);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
}
Reference: Java并发编程的艺术