Java并发基础-并发工具类(二)

并发工具类 本系列文章主要讲解Java并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下: 系列文章列表: Java并发基础-并发模型、基础接口以及Thread Java并发基础-同步和锁 Java并发基础-并发工具类(一) 本文主要以实例讲解Semaphore、阻塞队列等内容。 Semaphore 基本概念和用途 Semaphore常称信号量,其维护了一个许可集,可以用来控制线程并发数。线程调用acquire()方法去或者许可证,然后执行相关任务,任务完成后,调用release()方法释放该许可证,让其他阻塞的线程可以运行。 Semaphore可以用于流量控制,尤其是一些公共资源有限的场景,比如数据库连接。假设我们上面的账户余额管理中的账户修改操作涉及到去更改mysql数据库,为了避免数据库并发太大,我们进行相关限制。 常用方法 Semaphore(int permits):构造方法,初始化许可证数量 void acquire():获取许可证 void release():释放许可证 int availablePermits() :返回此信号量中当前可用的许可证数。 int getQueueLength():返回正在等待获取许可证的线程数。 boolean hasQueuedThreads() :是否有线程正在等待获取许可证。 void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。 Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。 运行示例 虽然在代码中设置了20个线程去运行,但同时设置了许可证的数量为5,因而实际的最大并发数还是5。 package com.aidodoo.java.concurrent; import java.util.concurrent.*; /** * Created by zhangkh on 2018/9/9. */ public class SemaphoreDemo { public static void main(String[] args){ Semaphore semaphore=new Semaphore(5); ExecutorService executorService = Executors.newFixedThreadPool(20); Account account=new Account(); for(int i=0;i<20;i++){ SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore); executorService.submit(spender); } executorService.shutdown(); } } class SpenderWithSemaphore implements Runnable { private final Account account; private final Semaphore semaphore; public SpenderWithSemaphore(Account account, Semaphore semaphore) { this.account = account; this.semaphore = semaphore; } @Override public void run() { try{ semaphore.acquire(); System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000)); Thread.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); }finally { // System.out.println(String.format("%s release a premit",Thread.currentThread().getName())); semaphore.release(); } } } 获取许可证后,模拟操作mysql,我们让线程睡眠2秒,程序输出如下: pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql 可以看到前面5个线程同一时间1536480858获得许可证,然后执行操作,并不是20个线程一起操作,这样能降低对mysql数据库的影响。 如果把上面Semaphore的构造方法中的许可证数量改为20,大家可以看到20个线程的运行时间基本一致。 源码实现 Semaphore实现直接基于AQS,有公平和非公平两种模式。公平模式即按照调用acquire()的顺序依次获得许可证,遵循FIFO(先进先出),非公平模式是抢占式的,谁先抢到先使用。 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 获取许可证 acquire()方法最终调用父类AQS中的acquireSharedInterruptibly方法。 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //(1) doAcquireSharedInterruptibly(arg); //(2) } (1):调用tryAcquireShared,尝试去获取许可证 (2):如果获取失败,则调用doAcquireSharedInterruptibly,将线程加入到等待队列中 tryAcquireShared方法由Semaphore的内部类,同时也是AQS的子类去实现,即NonfairSync和FairSync,下面我们以NonfairSync为例说明其实现。 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } 而nonfairTryAcquireShared方法如下: final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //(1) int remaining = available - acquires; //(2) if (remaining < 0 || compareAndSetState(available, remaining)) (3) return remaining; } } (1):获取state的值,也就是总许可证数量 (2):计算本次申请后,剩余的许可证数量 (3):如果剩余的许可证数量大于0且通过CAS将state的值修改成功后,返回剩余的许可证数量,否则继续循环阻塞。 释放许可证 release()方法的调用最终会调用父类AQS的releaseShared()方法: public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //(1) doReleaseShared(); //(2) return true; } return false; } (1):尝试释放许可证 (2):如果释放许可证成功,则通知阻塞的线程,让其执行 tryReleaseShared方法很简单,基本上是nonfairTryAcquireShared的逆过程,即增加许可证的数量,并通过CAS修改state的值。 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } BlockingQueue 基本概念 阻塞队列主要是解决如何高效安全传输数据的问题,此外能降低程序耦合度,让代码逻辑更加清晰。 其继承了Queue,并在其基础上支持了两个附加的操作: 当队列为空时,获取元素的线程会阻塞,等待队列变为非空 当队列满时,添加元素的线程会阻塞,等待队列可用 比较典型的使用场景是生产者和消费者。 BlockingQueue根据对于不能立即满足但可能在将来某一时刻可以满足的操作,提供了不同的处理方法,进而导致众多的api操作: Throws exception Special value Blocks Times out Insert add(e) offer(e) put(e) offer(e, time, unit) Remove remove() poll() take() poll(time, unit) Examine element() peek()} not applicable not applicable Throws exception:指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 Special value:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null Blocks:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。 Time out:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。 整体架构和类图 Java并发包根据不同的结构和功能提供了不同的阻塞队列,整体类图如下: 其中BlockingQueue有如下子类: ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。 SynchronousQueue:一个不存储元素的阻塞队列。 LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。 其中BlockingDeque有一个子类: LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 BlockingDeque作为双端队列,针对头部元素,还提供了如下方法: First Element (Head) Throws exception Special value Blocks Times out Insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit) Remove removeFirst() pollFirst() takeFirst() pollFirst(time, unit) Examine getFirst() peekFirst() not applicable not applicable 针对尾部元素 Last Element (Tail) Throws exception Special value Blocks Times out Insert addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit) Remove removeLast() pollLast() takeLast() pollLast(time, unit) Examine getLast() peekLast() not applicable not applicable 使用示例 一个典型的生产者和消费者实例如下,一个BlockingQueue可以安全地与多个生产者和消费者一起使用,Producer线程调用NumerGenerator.getNextNumber()生成自增整数,不断地写入数字,然后Consumer循环消费。 package com.aidodoo.java.concurrent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Created by zhangkh on 2018/7/17. */ public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue(1024,true); ExecutorService executorService = Executors.newFixedThreadPool(20); for (int i = 0; i < 5; i++) { executorService.submit(new Producer(queue)); } for (int i = 0; i < 3; i++) { executorService.submit(new Consumer(queue)); } Thread.sleep(30 * 1000L); executorService.shutdown(); } } class Producer implements Runnable { Logger logger = LoggerFactory.getLogger(Producer.class.getName()); protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for(int i=0;i<3;i++){ int num = NumerGenerator.getNextNumber(); queue.put(num); Thread.sleep(1000); logger.info("{} producer put {}", Thread.currentThread().getName(), num); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable { Logger logger = LoggerFactory.getLogger(Consumer.class.getName()); protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { int ele = (int) queue.take(); logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } class NumerGenerator{ private static AtomicInteger count = new AtomicInteger(); public static Integer getNextNumber(){ return count.incrementAndGet(); } } 程序输出如下: 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15 其他BlockingQueue子类的使用可参考对应的Java Api。 源码分析 由于BlockingQueue相关的子类众多,我们仅以ArrayBlockingQueue从源码角度分析相关实现。 构造方法 ArrayBlockingQueue中定义的成员变量如下: final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null 各变量的解释如下,以便了解后续的代码: items用于存储具体的元素 takeIndex元素索引,用于记录下次获取元素的位置 putIndex元素索引,用于记录下次插入元素的位置 count用于记录当前队列中元素的个数 notEmpty条件变量,此处为获取元素的条件,即队列不能为空,否则线程阻塞 notFull条件变量,此处为插入元素的条件,即队列不能已满,否则线程阻塞 itrs用于维护迭代器相关内容 内部结构如下: 构造方法如下: public ArrayBlockingQueue(int capacity) { this(capacity, false); //(1) } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //(2) lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); //(3) notFull = lock.newCondition(); //(4) } public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { //(5) check
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信