Java并发:CountDownLatch、CyclicBarrier、Semaphore与Exchanger

CountDownLatch

CountDownLatch 可以实现一些类似于计数器一样的功能。在一组线程执行完毕后,在执行主线程。下面看一个简单的例子。

public class Main{
    public static void main(String[] args) {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        int thread = 1;
        if (cpuCount >= 3) {
            thread = cpuCount - 2;
        }
        CountDownLatch count = new CountDownLatch(thread);
        for (int i = 0; i < thread; i++) {
            IndexThread indexThread = new IndexThread(count);
            indexThread.setName("index-thread-" + i);
            indexThread.start();
        }

        try {
            count.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class IndexThread extends Thread {
    private CountDownLatch count;
    public IndexThread(CountDownLatch count) {
        this.count = count;
    }
    @Override
    public void run() {
        System.out.println("线程运行中");
        count.countDown();
    }

}

CyclicBarrier

通过它可以实现让一组线程等待至某个状态之后再全部同时执行

public class Main{
    public static void main(String[] args) {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        int thread = 1;
        if (cpuCount >= 3) {
            thread = cpuCount - 2;
        }
        CyclicBarrier barrier  = new CyclicBarrier(thread);
        for (int i = 0; i < thread; i++) {
            IndexThread indexThread = new IndexThread(barrier);
            indexThread.setName("index-thread-" + i);
            indexThread.start();
        }
    }
}

class IndexThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    public IndexThread(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
        try {
            Thread.sleep(5000);      //以睡眠来模拟写入数据操作
            System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("所有线程写入完毕,继续处理其他任务...");
    }

}

又如以下例子,是在子线程达到barrier状态后,运行 Runnbale接口的run,然后在继续运行子线程的后续操作:

public class Main{
    public static void main(String[] args) {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        int thread = 1;
        if (cpuCount >= 3) {
            thread = cpuCount - 2;
        }
        CyclicBarrier barrier  = new CyclicBarrier(thread, new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());
            }
        });
        for (int i = 0; i < thread; i++) {
            IndexThread indexThread = new IndexThread(barrier);
            indexThread.setName("index-thread-" + i);
            indexThread.start();
        }
    }
}

class IndexThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    public IndexThread(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
        try {
            Thread.sleep(5000);      //以睡眠来模拟写入数据操作
            System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("所有线程写入完毕,继续处理其他任务...");
    }

}

Semaphore

Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可

public class Main{
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++) {
            new IndexThread(i,semaphore).start();
        }
    }
}

class IndexThread extends Thread {
    private int num;
    private Semaphore semaphore;
    public IndexThread(int num,Semaphore semaphore){
        this.num = num;
        this.semaphore = semaphore;
    }
    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println("工人"+this.num+"占用一个机器在生产...");
            Thread.sleep(1000);
            System.out.println("工人"+this.num+"释放出机器");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Exchanger

一般用于两个工作线程之间交换数据。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。

public class Main{

    private static final Exchanger<String> exgr = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水A";// A录入银行流水数据
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水B";// B录入银行流水数据
                    String A = exgr.exchange("B");
                    System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
                            + A + ",B录入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.shutdown();
    }
}

参考:http://ifeve.com/concurrency-exchanger/
http://www.importnew.com/21889.html

发表评论

This site uses Akismet to reduce spam. Learn how your comment data is processed.