Java中的一些并发工具类

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作,类似于Thread.join()方法,但功能比join的功能多。

join的原理是不停检查join线程是否存活,若果join线程存活,则让当前线程永远等待,当join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM中实现,JDK里是看不到的。

对于countDownLatch,其构造函数接收一个int类型的参数作为计数器,使用countDown方法时,计数器减1,直到计数器为0时,被await方法阻塞的线程不被阻塞。

package javaMultiThread;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        new Thread(new Runnable(){

            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }

        }).start();

        c.await();
        System.out.println(3);
    }

}

同步屏障CyclicBarrier

CyclicBarrier是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier默认的构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier自己已到达屏障,然后阻塞当前线程,若是最后一个线程到达,则使得之前被屏障的所有线程继续运行。

CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。

package javaMultiThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new Thread(new Runnable(){

            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    c.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(1);
            }

        }).start();
        try {
            c.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(2);

    }
    static class A implements Runnable{

        @Override
        public void run() {
            // TODO Auto-generated method stub
            System.out.println(3);
        }

    }

}

控制并发线程的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源

Semaphore可以用于做流量控制,特别是公共资源有限的应用场景

package javaMultiThread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoneTest {
    static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        for(int i = 0; i < THREAD_COUNT; i++){
            threadPool.execute(new Runnable(){

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    try {
                        s.acquire();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("save data");
                    s.release();
                }

            });
        }
        threadPool.shutdown();
    }

}

线程间交换数据的Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类,用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

package javaMultiThread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Exchanger {
    private static final java.util.concurrent.Exchanger<String> exgr = new java.util.concurrent.Exchanger<String>();

    private static ExecutorService  threadPool = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        threadPool.execute(new Runnable(){

            @Override
            public void run() {
                // TODO Auto-generated method stub
                String A = "银行流水A";
                try {
                    String B = exgr.exchange(A);
                    System.out.println(A);
                    System.out.println(B);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        });
        threadPool.execute(new Runnable(){

            @Override
            public void run() {
                // TODO Auto-generated method stub
                String B = "银行流水B";
                try {
                    String A = exgr.exchange(B);
                    System.out.println(A.equals(B) + " " + A + " " + B);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }

        });

        threadPool.shutdown();
    }

}

Reference: Java并发编程的艺术

Share