多线程学习笔记六之并发工具类CountDownLatch和CyclicBarrier

目录 简介 CountDownLatch 示例 实现分析 CountDownLatch与Thread.join() CyclicBarrier 实现分析 CountDownLatch和CyclicBarrier区别 简介   在编写多线程程序时,难免需要对并发流程进行控制,Thread类有join()和yield()等方法,JUC提供了更为灵活的并发工具类,下面就学习这些工具类的用法以及实现。 CountDownLatch   latch意思是门闩,countdown指从上往下数,CountDownLatch允许一个或多个线程等待其他任务线程完成操作,就像它的字面意思:从大往小数,数到某个值(0)的时候打开门闩。下面是CountDownLatch的api: //构造器 public CountDownLatch(int count); //调用await()方法的线程会进入等待状态,它会等待直到count值为0才继续执行 public void await(); //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public boolean await(long timeout, TimeUnit unit); //计数器减一 public void countDown() 可以看到通过构造器构造一个计数器,通过调用countDown方法计数减小,await在计数器大于0时线程处于等待状态,通过下面例子可以学会CountDownLatch的用法: 示例 public class LatchTest { public static void main(String[] args) { //两个线程,计数器传入2 final CountDownLatch latch = new CountDownLatch(2); //这两个线程执行了latch.countDown(),计数器归0,主线程才被唤醒继续执行 new Thread(() -> { try { System.out.println("子线程1: "+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程1: "+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { System.out.println("子线程2: "+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程2: "+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); try { System.out.println("等待2个子线程执行完毕..."); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } catch (InterruptedException e) { e.printStackTrace(); } } } 运行结果: 实现分析   CountDownLatch是基于共享锁实现的,内部类Sync继承同步器AQS,重点分析CountDownLatch以下三个方法: 构造方法   通过构造函数传入的参数count设置同步状态(count必须大于0,否则抛出异常),同步状态在这里并不表示线程获得锁的重入次数,而是表示一个计数器,计数器的大小与任务线程的数目是一致的, public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } await()   调用了await的线程会处于等待状态,直到计数器归0才会被唤醒。await方法调用了Sync父类AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly首先检查线程有中断,然后调用tryAcquireShared尝试获取共享锁,获取成功返回1,失败返回-1,若失败调用doAcquireSharedInterruptibly将当前线程加入同步队列阻塞住,等待计数器为0唤醒。 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } countDown()   countDown方法将计数器减一,调用了AQS的releaseShared方法,当tryReleaseShared方法返回true执行doReleaseShared方法,这个方法在分析读写锁是介绍过了,就是唤醒同步等列等待获取锁的线程,即唤醒调用了await方法等待计数器归0的线程。 public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } tryReleaseShared(int releases)   通过循环+CAS的方式修改同步状态state,当同步状态为0时返回true;同步状态为0,即表示计数器归0,所有调用了countDown的线程都执行完了,可以唤醒调用await等待的线程了。 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } CountDownLatch与Thread.join()   Thread类的join方法与CountDownLatch作用类似,join方法的实现原理不停检查调用join的线程是否存活,如果存活则让当前线程处于等待状态,当join线程终止后,会唤醒当前线程。CountDownLatch与join相比更灵活,不必非得线程中止只要调用了countDown方法就行了,可以响应中断以及能够设置超时等功能。 CyclicBarrier   CyclicBarrier是指可循环使用的屏障,它可以让一组线程当他们分别达到了同步点(common barrier point)时被阻塞,直到最后一个线程到达了同步点,屏障才会开门,让所有被屏障屏蔽的线程继续运行。 public class BarrierTest { public static void main(String[] args) { int size = 4; CyclicBarrier barrier = new CyclicBarrier(size); for(int i=0;i 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //g == generation && !g.broken说明此时当前这一轮还没结束,并且没有其它线程执行过 //breakBarrier方法。这种情况会执行breakBarrier置generation的broken标识为true并 //唤醒其它线程,之后继续抛出InterruptedException。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 如果g != generation,此时这一轮已经结束,后面返回index作为到达barrier的次序; // 如果g.broken说明之前已经有其它线程执行了breakBarrier方法,后面会抛出 //BrokenBarrierException。 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; //超时 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } breakBarrier()   损坏当前屏障,会唤醒所有在屏障中的线程,当线程被中断或等待超时会调用 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } nextGeneration()   nextGeneration方法在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中 private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } CountDownLatch和CyclicBarrier区别   从功能上说,CountDownLatch允许一个或多个线程等待其他线程完成操作,而CyclicBarrier是让一组线程达到一个公共同步点之后再一起放行;CountDownLatch计数器只能使用一次,CyclicBarrier可以使用reset方法重置用以处理某些复杂的业务场景。 https://www.cnblogs.com/rain4j/p/10183118.html 编程改变世界
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信