JUC并发工具 并发工具 线程池 线程是一种系统资源,每个线程都会占用一定的内存资源,过多的线程可能会导致内存溢出的情况。同时,过多的线程会导致频繁的上下文切换而降低性能。因此不能过多的创建线程,在此问题的基础上,出现了线程池。
线程池是用于管理和复用线程的机制,减少过多的内存占用和线程数量导致频繁的上下文切换,以提升性能。
自定义线程池
ThreadPool - 线程池:用于管理可以被复用的线程
BlockingQueue - 阻塞队列:用于平衡生产者生产和消费者消费速度差异
消费者:相当于线程池中的线程,获取阻塞队列中的任务并执行
生产者:相当于请求或者其他线程产生任务
当消费者线程消费速度高于生产者线程生产时,线程池中线程需要等待 阻塞队列
当生产者线程生产速度高于消费者线程消费,阻塞队列用于存储来不及消费的任务
简单时间一个最大线程数和线程工厂的自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 public class SimpleTest { private static final Logger logger = LoggerFactory.getLogger(PoolTest.class); public static void main (String[] args) { MyThreadPool myThreadPool = new MyThreadPool (2 , 5 , TimeUnit.SECONDS, 5 , (task) -> ((MyThreadPool.Worker) task).getTarget().run()); List<Runnable> runnables = new ArrayList <>(100 ); for (int i = 1 ; i <= 100 ; i++) { final int a = i; runnables.add(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } logger.info("task" + a); }); } for (Runnable r : runnables) { myThreadPool.exec(r); } } } @FunctionalInterface interface RejectedHandler <T> { void reject (T r) ; } class MyThreadPool { private static final Logger logger = LoggerFactory.getLogger(MyThreadPool.class); private MyBlockingQueue<Worker> myBlockingQueue; private HashSet<Worker> workers = new HashSet <>(); private int coreSize; private long timeout; private ReentrantLock workerLock = new ReentrantLock (); private TimeUnit timeUnit; private RejectedHandler rejectedHandler; public MyThreadPool (int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectedHandler rejectedHandler) { myBlockingQueue = new MyBlockingQueue <>(queueCapacity); this .coreSize = coreSize; this .timeout = timeout; this .timeUnit = timeUnit; this .rejectedHandler = rejectedHandler; } public void exec (Runnable runnable) { workerLock.lock(); try { if (workers.size() < coreSize) { Worker worker = new Worker (runnable); worker.start(); logger.info("add worker:{} pre task:{}" , worker, runnable); workers.add(worker); } else { logger.info("task put queue:{} current queue size:{}" , runnable, myBlockingQueue.size()); myBlockingQueue.offer(new Worker (runnable), rejectedHandler); } } finally { workerLock.unlock(); } } public class Worker extends Thread { private Runnable target; public Worker (Runnable target) { this .target = target; } public Runnable getTarget () { return target; } @Override public void run () { while (target != null ) { logger.info("exec task:{}" , target); target.run(); target = myBlockingQueue.poll(timeout, timeUnit); } workerLock.lock(); try { logger.info("remove worker:{}" , this ); workers.remove(this ); } finally { workerLock.unlock(); } } } } class MyBlockingQueue <T> { private Deque<T> deque = new ArrayDeque <T>(); private volatile int capacity = 10 ; private ReentrantLock lock = new ReentrantLock (); private Condition consumer_condition = lock.newCondition(); public MyBlockingQueue (int capacity) { this .capacity = capacity; } public int size () { return deque.size(); } public T poll (final long timeout, final TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (deque.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = consumer_condition.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T pop = deque.pop(); return pop; } finally { lock.unlock(); } } public void offer (T t, RejectedHandler rejectedHandler) { lock.lock(); try { if (deque.size() >= capacity) { rejectedHandler.reject(t); } else { deque.offer(t); consumer_condition.signalAll(); } } finally { lock.unlock(); } } }
ThreadPoolExecutor ThreadPoolExecutor是jdk提供的线程池的实现
Executor:是一个执行器接口,提供了提交的 Runnable 任务的对象
ExecutorService:是执行器服务接口,是线程池最基础的接口,扩展了 Executor 并用于管理任务和执行器本身的生命周期的附加方法。
引入了 submit(Callable task) 方法,返回挂起结果 Future
AbstractExecutorService:是抽象执行器服务,实现了Executor的基本方法
ThreadPoolExecutor:ThreadPoolExecutor 是 ExecutorService 接口的具体实现,提供了一个灵活可配置的线程池实现,开发者可以使用 ThreadPoolExecutor 实例来管理和执行任务。
ScheduledExecutorService:ScheduledExecutorService在ExecutorService线程池基础接口的基础上提供了定时或延迟执行的抽象方法
ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor 是 ScheduledExecutorService 的具体实现。
线程池状态 ThreadPoolExecutor使用原子整数 ctl - control state 存储线程池的状态,ctl 包含了两个部分:高3位表示 线程池的运行状态,低29位表示 工作线程的数量,这样设计的好处是,保证了操作状态和工作线程数量两个值通过一次CAS操作同时进行赋值,实现了高效的状态管理。
1 2 3 4 5 6 7 8 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf (int rs, int wc) { return rs | wc; }
状态名
高3位
说明
是否接受任务
是否处理阻塞队列任务
RUNNING
111
线程池处于运行状态(初始状态)
Y
Y
SHUTDOWN
000
线程池处于关闭状态,不再接受新的任务,但会继续执行已经提交的任务即阻塞队列的任务。
N
Y
STOP
001
线程池处于停止状态,不再接受新的任务,也不会执行阻塞队列中的任务,并中断正在执行的任务。
N
N
TIDYING
010
所有的任务都已经终止,工作线程数量为0,线程池即将转换到 TERMINATED 状态。
TERMINATED
011
线程池终止,不再处于任何活动状态。
构造方法 1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectExecutionHandler) ;
corePoolSize:核心线程数 - 线程池的基本大小,即在没有任务需要执行时,线程池的大小始终保持在这个数量(注意刚开始是0,用到才会创建)。即使线程是空闲的,它们也不会被销毁,除非设置了 allowCoreThreadTimeOut
。
maximumPoolSize:最大线程数 - 线程池允许创建的最大线程数。
如果在阻塞队列中有任务等待执行,且已有的线程数小于 corePoolSize
,则会创建新的线程作为 核心线程。
如果已有的线程数等于 corePoolSize
,新的任务会加入阻塞队列中等待执行。
当阻塞队列已满,有新的任务的来时,创建新的线程作为救急线程直到线程数达到maximumPoolSize
,这些救急线程会在不需要时被销毁。
当阻塞队列已满,线程数已经达到 maximumPoolSize
,新的任务无法执行,会触发拒绝策略。
keepAliveTime:线程空闲时间 - 当线程池中的线程数超过 corePoolSize
时,多余的空闲救急线程在被终止之前等待新任务的最长时间即从线程空闲开始算起的最大空闲时间。
unit:时间单位 - 用于指定 keepAliveTime
参数的时间单位。
workQueue:阻塞队列 - 用于保存等待执行的任务的阻塞队列。可以使用不同的阻塞队列实现。
threadFactory:线程工厂 - 用于创建新线程的工厂。可以通过自定义线程工厂来指定新线程的属性,如线程名称、优先级等。
rejectExecutionHandler:拒绝策略 - 当任务添加到线程池中被拒绝时的处理策略。
注意:只有当阻塞队列为有容量限制的队列时,才会在超过其队列容量大小时,创建最多 maxnumPoolSize - corePoolSize 的数据线程来救急。
拒绝策略
自定义线程池的拒绝策略是指当线程池的任务队列已满并且最大线程数被耗尽的情况下,线程池应对任务处理的一种策略。
AbortPolicy(默认策略):会直接抛出RejectedExecutionException
异常,阻止新任务的提交。
CallerRunsPolicy:当线程池无法接受新任务时,会将任务交给调用者线程来执行。
保证了任务一定会被执行,但是可能会导致调用者线程的负担增加
AbortPolicy:丢弃无法处理的任务,不提供任何反馈。
DiscardPolicy:丢弃阻塞队列等待时间最长的任务,然后尝试将新任务加入队列。
其他框架对于拒绝策略有很多不同的扩展:
dubbo 框架有拒绝策略实现 抛出异常并记录日志,并且dump线程栈帧的信息,方便问题的定位
netty 框架有拒绝策略实现 创建一个新的线程来执行任务
activemq 框架有拒绝策略实现 设置60秒超时等待时间 期间不断尝试将任务放入到队列中
常用方法 提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void execute (Runnable command) ;<T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
关闭线程池
shutdown
调用 shutdown 方法将线程池的状态更改为 SHUTDOWN,不会接受新的任务,但会将已提交的任务和在阻塞队列中的任务执行完毕。但是shutdown 方法不会阻塞调用者线程的执行,调用者线程会继续执行,而不会等待线程池中的任务执行完毕。
shutdown 方法是一个优雅关闭线程池的方式,让线程池在执行完已提交的任务后安全地终止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void shutdown () { ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { this .checkShutdownAccess(); this .advanceRunState(SHUTDOWN); this .interruptIdleWorkers(); this .onShutdown(); } finally { mainLock.unlock(); } this .tryTerminate(); }
shutdownNow
调用 shutdownNow 方法将线程池的状态更改为 STOP,不会接受新的任务的同时,也会将尝试中断所有工作线程,包括正在执行任务的线程,正在执行任务的线程使用 interrupt 打断,当前阻塞队列中的任务会作为方法的返回结果。
1 List<Runnable> shutdownNow () ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public List<Runnable> shutdownNow () { ReentrantLock mainLock = this .mainLock; mainLock.lock(); List tasks; try { this .checkShutdownAccess(); this .advanceRunState(STOP); this .interruptWorkers(); tasks = this .drainQueue(); } finally { mainLock.unlock(); } this .tryTerminate(); return tasks; }
Executors
Executors 是 基于ThreadPoolExecutor构造 的一个工厂类,提供了很多方法来创建不同用途的线程池。
配置项相对于ThreadPoolExecutor较少,但是某些场景的简化,可能无法满足特定的需求。
newFixedThreadPool
newFixedThreadPool 创建一个固定大小的线程池,当有新的任务提交时,如果线程池中的线程数小于nThreads
,则创建新线程来处理任务,否则将任务加入队列等待执行,最多为 nThreads 个核心线程数,没有救急线程。
注意:该构造方法的阻塞队列采用了 LinkedBlockingQueue ,属于无界队列。
使用场景:适用于需要控制并发线程数量或者执行固定数量的任务的场景
1 2 3 4 5 6 public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); }
newCachedThreadPool
newCachedThreadPool可缓存的线程池,该线程池没有核心线程,最大线程数为 Integer.MAX_VALUE,线程池中的线程数量根据任务的数量动态调整,这些线程都是救急线程,超过60秒会被回收。当有新任务提交时,如果有空闲线程,则重用现有线程,否则创建新线程。
该构造方法的阻塞队列采用了 SynchronousQueue 特殊类型的队列,内部并不存储元素,而是将任务直接交给等待的线程。它的容量为0,意味着每个插入操作必须等待另一个线程的对应移除操作,它这种特性是专门应用于 newCachedThreadPool 的。
使用场景: 适用于任务数量不确定但每个任务执行时间较短、可能有爆发性增长的场景,自动调整线程数量。
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
newSingleThreadExecutor
newSingleThreadExecutor 创建一个只有单个线程的线程池。所有提交的任务将按顺序执行,并且保证不会有并发执行的情况。
该构造方法的阻塞队列采用 LinkedBlockingQueue 链式无解队列。
这种只有一个线程的线程池和一个线程串行执行的区别在于:线程池有拒绝策略进行补偿,同时线程池会维护一个线程并保证其的政策工作(即使有线程发生意外终止了,线程池会创建一个新的线程来顶替,保证线程池中始终有且只有一个可用的线程)。
使用场景: 顺序执行的任务
注意:newSingleThreadExecutor方法底层通过FinalizableDelegatedExecutorService装饰,只暴露了 ExecutorService 接口,不能调用 ThreadPoolExecutor 特有的方法,例如 setCorePoolSize 等方法。
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
异步模式之工作线程 线程池让有限的工作线程(worker thread)来轮流地异步处理无限多的任务,采用了工作线程的模式,也有分工模式和享元模式的思想在里面。
在实际的应用场景中,由于不同类型的任务可能有不同的执行特性和优先级,因此不同的任务类型应该使用不同的线程池,这样可以避免饥饿现象的同时,也可以提升运行效率。
饥饿现象 线程池的饥饿现象是指由于线程不足,导致很多任务无法继续执行的现象。在这种情况下,这些任务可能会一直等待,无法得到执行的机会,因为线程池中的线程资源已经被占满,无法满足所有任务的并发执行需求。
线程池推荐配置 过小的线程数不能充分利用多核CPU的系统资源,而且会导致饥饿问题的产生;过多的线程数会导致频繁的线程上下文的切换,占用更多的内存。
CPU密集型任务
CPU密集型任务指的是 任务执行过程中需要大量的CPU计算资源,这种情况下,可以使用 固定大小的线程池,这样可以确保CPU的资源高效的利用,不会出现频繁的上下文切换而带来的性能上的开销。固定大小的线程池一般采用 cpu核数 + 1 (+1 目的是 保证当有线程由于页缺失故障或其他原因导致暂停,额外的线程可以顶上,保证CPU时钟周期不被浪费)。
IO密集型任务
IO密集型任务指的是 任务执行过程中会涉及到大量的I0操作(如读写文件、网络通信等),IO操作会让线程等待,不会占用CPU资源,此时可以提高线程数来提升CPU的利用率。
1 线程数 = CPU核心数 * 期望CPU利用率 * 总时间(CPU计算时间 + IO等待时间)/ CPU计算时间
任务调度线程池
在任务调度线程池加入之前,jdk提供了 java.util.timer来实现定时功能,但timer的缺点在于所有的任务都是由同一个线程来调度执行,因此所有的任务都是串行执行,在单线程串行处理的情况下,同一时间只有一个任务可以被执行,无法并行,前一个任务的延迟会导致后一个任务的执行时间延迟,前一个任务的抛出的异常会导致线程消失,而是先回收异常线程,再创建一个新的线程。
引入 任务调度线程池 ScheduledThreadPoolExecutor ,它可以延迟执行任务,也可以定时执行任务,通过设置核心线程数可以防止任务延迟,同时及时执行异常时,线程也不会消失,而是先回收异常线程,再创建一个新的线程。
1 2 3 4 5 6 7 8 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1 );scheduledExecutorService.schedule(Runnable command, long delay, TimeUnit unit); scheduledExecutorService.scheduleWithFixedDelay(Runnable command, long initialDelay,long delay, TimeUnit unit); scheduledExecutorService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
任务调度应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void startTaskEveryFriday () { LocalDateTime now = LocalDateTime.now(); LocalDateTime time = now.with(DayOfWeek.FRIDAY).withHour(18 ).withMinute(0 ).withSecond(0 ).withNano(0 ); if (time.isBefore(now)) { time = time.plusWeeks(1 ); } long initialDelay = Duration.between(now, time).toMillis(); logger.info("time:{}, initialDelay:{}" , time, initialDelay); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3 ); scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println("hello world!" ); }, initialDelay, 1000 * 60 * 60 * 24 * 7 , TimeUnit.MILLISECONDS); }
线程池的异常处理
线程池的异常不会导致线程池中的线程数量减少,线程池会回收掉异常线程,并创建新的线程补充,因此其他的线程不会收到影响。
当调用 execute(Runnable r) 方法时,异常会信息会打印在控制台,当调用 submit(Callable c) 方法时,异常信息不会被打印在控制台,而是封装到返回结果 Future 中,使用 future.get 方法可以获取异常并捕获。
Tomcat线程池 tomcat 核心组成部分主要包括 connector(连接器)和 container(容器)。connector 负责和客户端连接通信;container 负责管理和执行 Servlet。其中 connector 连接器部分使用了 线程池。connector 的架构如下图:
Tomcat架构
LimitLatch:基于semaphore实现的限流,可以控制最大的连接个数
Acceptor:不断执行负责接收新的socket连接
Poller:负责监听 socket channel 是否有 可读的I/O事件,一旦有可读的事件,会封装备为一个任务对象提交给 Executor 线程池执行
Executor 线程池负责将 处理请求
Tomcat Executor线程池流程 如果任务数量超过核心线程数,但没有超过最大线程数,线程池会创建新的线程来处理额外的任务(不会超过核心线程数,这些线程在任务完成后可能会被保留一段时间以备重用,或者在空闲一定时间后被销毁),如果任务数量超过核心线程数和最大线程数的总和,多余的任务才会被放入任务队列中。
Tomcat Executor线程池扩展 Tomcat线程池对于jdk提供的ThreadPoolExecutor线程池进行了扩展:
当总任务到达 maximumPoolSize 并且阻塞队列也满了的时候,它不会抛出 RejectedExecutionException 异常,而是再次尝试将任务放入队列中,如果再不成功则抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # org.apache.catalina.core.StandardThreadExecutor public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException ("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException (x); }} else { submittedCount.decrementAndGet(); throw rx; } } }
Tomcat Executor配置
配置项
默认值
说明
threadPriority
5
线程优先级
daemon
true
是否为守护线程
minSpareThreads
25
核心线程数 相当于 corePoolSize
maxThreads
200
最大线程数 相当于 maximumPoolSize
maxIdleTime
60000
线程生存时间,单位是毫秒,默认值即 1 分钟
maxQueueSize
Integer.MAX_VALUE
队列长度
prestartminSpareThreads
false
核心线程的创建方式:false - 懒加载 true - 预加载
Fork/Join线程池
Fork/Join 线程池是 一种并行处理的线程池,使用了分治的思想,加入多线程,将每个任务进行拆分交给不同的线程来处理,进一步提升运行效率。
分而治之的思想是指:将问题分解成更小子问题并并行处理的思想。
适用于可以进行任务拆分的cpu密集型运算。
Fork/Join 线程池默认创建与cpu核心数大小相同的线程,这是由于需要拆分的任务大多是cpu密集型任务,线程数和cpu核心数相同即可。
使用 创建任务对象
任务类继承抽象类 RecursiveTask - 有返回结果 或者 RecursiveAction - 无返回结果,重写抽象方法 compute
1 2 3 4 5 6 7 8 9 10 class MyTask extends RecursiveTask <Integer> { @Override protected Integer compute () { ... } } class MyTask extends RecursiveAction { ... }
创建线程池 1 2 3 4 ForkJoinPool forkJoinPool = new ForkJoinPool ();forkJoinPool.invoke(ForkJoinTask<T> task);
实例 模拟 n + … + 5 + 4 + 3 + 2 + 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class ForkJoinDemo { public static void main (String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool (); Integer invoke = forkJoinPool.invoke(new MyTask (5 )); System.out.println(invoke); } } class MyTask extends RecursiveTask <Integer> { private static final Logger logger = LoggerFactory.getLogger(MyTask.class); int n; public MyTask (int n) { this .n = n; } @Override protected Integer compute () { if (n == 1 ) { return 1 ; } MyTask myTask = new MyTask (n - 1 ); myTask.fork(); Integer t = myTask.join(); int result = n + t; logger.info("join {} + {} = {}" , t, n, result); return result; } } # 日志输出: 21 :02 :26 [ForkJoinPool-1 -worker-1 ] com.example.demo.threadpool.MyTask - join 1 + 2 = 3 21 :02 :26 [ForkJoinPool-1 -worker-2 ] com.example.demo.threadpool.MyTask - join 3 + 3 = 6 21 :02 :26 [ForkJoinPool-1 -worker-1 ] com.example.demo.threadpool.MyTask - join 6 + 4 = 10 21 :02 :26 [ForkJoinPool-1 -worker-1 ] com.example.demo.threadpool.MyTask - join 10 + 5 = 15 15
优化 上面的方法 发现任务之间都是相互依赖 MyTask(5) 依赖 MyTask(4) 的结果,这并不能发挥分代并行的优势,基于此改进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class MyProTask extends RecursiveTask <Integer> { private static final Logger logger = LoggerFactory.getLogger(MyProTask.class); private int begin; private int end; public MyProTask (int begin, int end) { this .begin = begin; this .end = end; } @Override protected Integer compute () { if (begin == end) { return begin; } if (end - begin == 1 ) { return end + begin; } int mid = (end + begin) / 2 ; MyProTask t1 = new MyProTask (begin, mid); MyProTask t2 = new MyProTask (mid + 1 , end); t1.fork(); t2.fork(); Integer join1 = t1.join(); Integer join2 = t2.join(); int result = join1 + join2; logger.info("t1 {} t2 {} result {}" , join1, join2, result); return result; } }
总结
Fork/Join 线程池 的重点在于 任务的拆分,尽量保证发挥分代并行的优势
JUC AQS JUC包下的很多工具都是依赖于AQS
AQS - AbstractQueuedSynchronized 翻译则是 抽象的基于队列的同步器,是阻塞式锁和相关同步器工具的基类。
AQS 的核心思想是使用一个状态变量来表示共享资源的状态,并通过原子性的状态修改和线程阻塞/唤醒等操作来实现对共享资源的安全访问。
特点 AQS 的主要特点如下:
state - 状态:表示 共享资源的状态。它是AQS维护的一个原子性的状态变量,状态可以被多个线程共享,通过 CAS(Compare and Swap)等原子操作进行修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private volatile int state; private static final long stateOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state" )); } catch (Exception ex) { throw new Error (ex); } } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
state 支持两种模式:共享模式 - shared mode 和 独占模式 - execlusive mode
共享模式:在独占模式下,同一时刻只有一个线程能够获得锁,其他线程必须等待。
state 值为 0 时表示没有线程持有锁,大于0表示有线程持有锁
常见应用:ReentrantLock
独占模式:在共享模式下,允许多个线程访问共享资源
state的值为计数器,表示被共享资源的线程数量
常见应用:Semaphore
子类需要定义和维护这个状态,实现不同模式下的应用
queue - 等待队列:AQS 使用一个先进先出(FIFO)的等待队【通过节点(Node)来实现的一个双向链表(Double-Linked Queue)】来管理阻塞的线程。当线程请求访问资源但无法获得时,会被加入到等待队列中等待。
condition - 条件变量: AQS 支持条件变量,允许线程按照特定的条件等待或唤醒,实现了更加灵活的线程通信。
exclusiveOwnerThread - 独占锁的线程引用,表示当前锁持有独占锁的线程。
使用
开发者可以创建自定义的同步器继承 AQS 并实现其抽象方法,从而构建更复杂的同步工具。
继承AQS需要实现下面的模版方法:
tryAcquire:用于尝试获取独占锁
tryRelease:用于尝试释放独占锁
tryAcquireShard:用于尝试释放独占锁
tryReleaseShard:用于尝试释放共享锁
isHeldExclusively:用于判断当前线程是否持有独占锁
简单实现一个不可重入的独占锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 public class AqsTest { private static final Logger logger = LoggerFactory.getLogger(AqsTest.class); public static void main (String[] args) { MyLock myLock = new MyLock (); new Thread (() -> { myLock.lock(); logger.info("t1 lock..." ); try { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { logger.info("t1 unlock..." ); myLock.unlock(); } }).start(); new Thread (() -> { myLock.lock(); logger.info("t2 lock..." ); try { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { logger.info("t2 unlock..." ); myLock.unlock(); } }).start(); } } class MyLock implements Lock { private MySnyc snyc = new MySnyc (); class MySnyc extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } private Condition newCondition () { return new ConditionObject (); } } @Override public void lock () { snyc.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { snyc.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return snyc.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return snyc.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { snyc.release(1 ); } @Override public Condition newCondition () { return snyc.newCondition(); } }
Lock ReentrantLock实现原理
ReentrantLock内部维护Sync同步器,同步器继承自 aqs,针对Sync抽象同步器,内部提供了两种不同实现:NonfairSync 和 FairSync,分别对应了非公平锁和公平锁。
非公平锁实现 从无参构造器来看,默认使用的非公平锁的同步器实现
1 2 3 4 # reentrantlock.java public ReentrantLock () { sync = new NonfairSync (); }
加锁成功 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # aqs.java public void lock () { sync.lock(); } # reentrantlock.java static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } }
加锁失败 上面加锁失败后,执行了同步器默认实现的 acquire 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
再次尝试获取独占锁失败后,会执行逻辑与之后的逻辑,将阻塞线程添加到队列里
AbstractQueuedSynchronizer 的队列通过节点(Node)实现的一个双向链表(Double-Linked Queue),它是由一个虚拟的头结点(head)和一个虚拟的尾结点(tail)组成的。
尝试添加一个节点,首次添加节点时,AQS会创建一个虚拟头结点(head),并将这个头结点作为队列的头部。然后再创建一个表示当前等待线程的真正节点,该节点是独占模式的等待节点,将它添加到队列尾部中,并设置它为尾结点(tail)。虚拟头节点的作用在于高效地进行队列的入队和出队操作。
在将当前线程加入到等待队列后,使线程进入自旋状态等待获取锁,自旋获取锁失败则被 park
释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # ReentrantLock.java public void unlock () { sync.release(1 ); } # AbstractQueuedSynchonizer.java public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } # ReentrantLock#Sync.java protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
ReentranctLock 执行 unlock 方法时,会调用 同步器的tryRelease 的方法
方法内部调用 tryRelease 方法,设置 当前同步器的状态为 0 并且设置 exclusiveOwnerThread 为 null 表示当前锁没有被占有
当前队列不为空,并且 waitStatus 为-1 时 进入unparkSuccessor流程
在 unpark 唤醒头节点的后继节点,让后继节点的所属线程从被park的地方(即自旋的地方)继续执行
可重入锁原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 static final class NonfairSync extends Sync { protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); / setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } }
可打断原理 reentrantlock 分为两种模式:可打断模式 和 不可打断模式
1 2 3 4 5 6 7 8 9 public void lock () { sync.lock(); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); }
不可打断模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } public static boolean interrupted () { return currentThread().isInterrupted(true ); }
loksupport.park 可以被 thread.interrupt() 打断继续执行,但是继续执行后,它还是在自旋获取锁的代码中,重新被park掉(park被调用interrupt线程无法再次被park掉,需要调用Thread.interrupted()返回返回中断标识并会清除中断标识,以便再次被 park 住),因此,即使有线程调用打断被阻塞的线程,线程被唤醒还是在自旋获取锁的代码中执行,需要获取锁以后,才跳出循环,打断后正常继续执行。
1 2 3 4 5 6 7 8 9 10 11 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt () { Thread.currentThread().interrupt(); }
可打断模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
公平性原理 非公平锁原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static final class NonfairSync extends Sync { protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
公平锁原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 # ReentrantLock.java static final class FairSync extends Sync { protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } } # aqs.java public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
条件变量原理 ReentrantLock 条件变量对应了一个 ConditionObject 对象,每个 ConditionObject 对象维护了一个 双向链表,保存着等待条件阻塞的线程。
await
当 thread-0 持有锁时,调用 condition.await 方法,将创建一个状态为2(Node.Condition表示线程在等待条件)关联线程的节点加入到 conditionObject 实例对象的双向链表的尾部中,并且尝试释放重入锁和清空重试次数,设置 exclusiveOwnerThread 为 null,并唤醒阻塞队列的后继节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public class ConditionObject implements Condition , java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } }
signal 调用 thread-1 调用 signal 方法时,将 conditionObject 实例中的双向链表中的第一个节点移出来,改变节点的状态从 Node.CONDITION-2 转换为 0 并将该线程节点转移到等待锁的等待队列中,当 thread-1线程执行完毕后释放锁,会唤醒等待队列中的下一个节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public class ConditionObject implements Condition , java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal( first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; } }
读写锁 ReentrantReadWriteLock
ReentrantReadWriteLock 提供了一对关联的锁,用于读和写操作,它允许多个线程同时读取资源,同时确保只有一个线程能够进行写操作即实现 “读-读并发 读-写互斥 写写互斥” 的效果。
读锁:多个线程可以同时持有读锁,只要当前没有线程持有写锁。
写锁:同一时间只有一个线程能够持有写锁。如果有任何线程持有写锁,则其他线程无法获取读锁或者写锁。
适用于 读多写少的场景
注意事项:
读锁不支持条件变量(condition),因为读锁是非独占的,可以有多个线程持有读锁,但是写锁是独占的,支持条件变量。
重入性 - 持有写锁的线程可以获取读锁,但是持有读锁的线程不可以获取写锁 (支持锁重入的升级 不支持锁重入的降级)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class ReentrantReadWriteLockTest { public static void main (String[] args) { DataContainer dataContainer = new DataContainer (); new Thread (() -> dataContainer.read()).start(); new Thread (() -> dataContainer.write()).start(); } } class DataContainer { private static final Logger logger = LoggerFactory.getLogger(DataContainer.class); private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock (); private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); public void read () { logger.info("获取读锁" ); readLock.lock(); try { logger.info("获取对象" ); TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { logger.info("释放读锁" ); readLock.unlock(); } } public void write () { logger.info("获取写锁" ); writeLock.lock(); try { logger.info("设置对象" ); } finally { logger.info("释放写锁" ); writeLock.unlock(); } } } # 输出 21 :29 :21 [Thread-0 ] com.example.demo.aqs.DataContainer - 获取读锁21 :29 :21 [Thread-1 ] com.example.demo.aqs.DataContainer - 获取写锁21 :29 :21 [Thread-0 ] com.example.demo.aqs.DataContainer - 获取对象21 :29 :22 [Thread-0 ] com.example.demo.aqs.DataContainer - 释放读锁21 :29 :22 [Thread-1 ] com.example.demo.aqs.DataContainer - 设置对象21 :29 :22 [Thread-1 ] com.example.demo.aqs.DataContainer - 释放写锁
StampedLock
在读写锁的使用过程中,底层利用AQS并使用CAS修改其状态,为了进一步优化读的性能,引入的StampedLock,它的特点是 使用它的读写锁时都需要配合一个戳使用。
1 2 3 StampedLock stampedLock = new StampedLock ();long stamp = stampedLock.readLock();stampedLock.unlockRead(stamp);
1 2 3 StampedLock stampedLock = new StampedLock ();long stamp = stampedLock.writeLock();stampedLock.unlockWrite(stamp);
乐观读:StampedLock 支持 乐观读,核心方法就是 tryOptimistic 方法,该方法不会加锁,而时获取乐观素的的一个戳,在执行读操作之前,进行一次戳校验,如果戳校验失败,则说明在这期间,没有写线程获取写锁执行写操作,数据可以安全的读取,否则,需要锁升级并重新获取 读锁,保证数据安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 StampedLock stampedLock = new StampedLock (); long stamp = stampedLock.tryOptimisticRead(); if (stampedLock.validate(stamp)) { return obj; } try { stampedLock.readLcok(); } finally { stampedLock.unLockRead(); }
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class DataContainer { private static final Logger logger = LoggerFactory.getLogger(DataContainer.class); private StampedLock stampedLock = new StampedLock (); public void read () { logger.info("获取乐观读锁" ); long stamp = stampedLock.tryOptimisticRead(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } if (stampedLock.validate(stamp)) { logger.info("验戳成功,开始读取" ); return ; } try { stamp = stampedLock.readLock(); logger.info("验戳失败,获取读锁,执行读操作" ); return ; } finally { stampedLock.unlockRead(stamp); } } public void write () { logger.info("获取写锁" ); long stamp = stampedLock.writeLock(); try { logger.info("执行写操作" ); } finally { logger.info("释放写锁" ); stampedLock.unlockWrite(stamp); } } }
注意
stampedLock不支持条件变量 condition
stampedlock不支持可重入
Semaphore
Semaphore 信号量 用于限制同时访问共享资源的线程上限。
用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Semaphore semaphore = new Semaphore (3 ); try { semaphore.acquire(); } catch (InterruptedException e) { throw new RuntimeException (e); } try { } finally { semaphore.release(); }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Semaphore semaphore = new Semaphore (3 );for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { semaphore.acquire(); logger.info("获取信号量" ); } catch (InterruptedException e) { throw new RuntimeException (e); } try { logger.info("running..." ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { semaphore.release(); logger.info("释放信号量" ); } }).start();
应用
semaphore原理
semaphore 底层实现是基于 AQS 实现,将 AQS 的 state 设置为 信号量的许可数。
acquire
每次acquire操作将尝试获取一个许可,而release操作将释放一个许可,当 state为正数时,表示有可用的许可,当 state 为零时,表示没有可用的许可,其他线程调用 acquire 方法会被阻塞并且进入 AQS的阻塞队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class Semaphore implements java .io.Serializable { static final class NonfairSync extends Sync { NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } }
release
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class Semaphore implements java .io.Serializable { static final class NonfairSync extends Sync { protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } } public void release () { sync.releaseShared(1 ); } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } }
CountdownLatch
CountdownLatch 为 倒计时门栓,用于控制多个线程之间的同步,允许一个线程或者多个线程等待一组操作完成。
CountdownLatch 的核心概念为 计数器,在构造 CountdownLatch 需要指定一个初始化计数器,线程通过调用 await 方法阻塞等待计数器达到0,其他线程通过 调用 countdown 方法递减计数器数值,直到0后,唤醒 调用 await的线程。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class CountDownLatchDemo { private static final Logger logger = LoggerFactory.getLogger(CountDownLatchDemo.class); public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4 ); CountDownLatch countDownLatch = new CountDownLatch (3 ); for (int i = 0 ; i < 3 ; i++) { executorService.submit(() -> { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } countDownLatch.countDown(); logger.info("countdown..." ); }); } executorService.submit(() -> { try { logger.info("await..." ); countDownLatch.await(); logger.info("signal" ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); executorService.shutdown(); } } # 输出 await... countdown... countdown... countdown... signal
应用 一个游戏等待10个人加载完毕,则开始游戏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CountDownLatchApp { private static final Logger logger = LoggerFactory.getLogger(CountDownLatchDemo.class); public static void main (String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); String[] schedules = new String [10 ]; CountDownLatch countDownLatch = new CountDownLatch (10 ); Random random = new Random (); for (int i = 0 ; i < 10 ; i++) { int a = i; executorService.submit(() -> { for (int j = 0 ; j <= 100 ; j++) { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(100 )); } catch (InterruptedException e) { throw new RuntimeException (e); } schedules[a] = j + "%" ; logger.info("\r" + Arrays.toString(schedules)); } countDownLatch.countDown(); }); } countDownLatch.await(); logger.info("游戏加载完毕" ); executorService.shutdown(); } } # 输出: [100 %, 100 %, 100 %, 100 %, 100 %, 100 %, 100 %, 100 %, 100 %, 100 %] 游戏加载完毕
CountdownLatch 常用于 多线程执行多个任务,等待多个任务都执行完毕,在执行下面的场景。
CyclicBarrier
CyclicBarrier 循环屏障,用于多线程环境下协调多个线程执行任务即线程协作。
允许一组线程到达某个共同点之前相互等待,并在所有线程都到达这个点时同时继续执行。
CyclicBarrier 实现原理
CyclicBarrier 内部维护了一个计数器,当每个线程调用 await 方法时,当前线程被阻塞并且 CyclicBarrier 内部的计数器 count 就会减1,当计数器为0时,当所有线程都到达屏障点,屏障就会打开,所有被阻塞的线程被释放。
CyclicBarrier 循环使用
CyclicBarrier 可以被重用,在屏障打开时会自动重置计数器,等待一轮同步。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class CyclicBarrierTest { private static final Logger logger = LoggerFactory.getLogger(CyclicBarrierTest.class); public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2 ); CyclicBarrier cyclicBarrier = new CyclicBarrier (2 , new Runnable () { @Override public void run () { logger.info("cyclicBarrier over..." ); } }); for (int i = 0 ; i < 3 ; i++) { executorService.submit(() -> { logger.info("task1 begin..." ); try { TimeUnit.SECONDS.sleep(1 ); cyclicBarrier.await(); logger.info("task1 end..." ); } catch (InterruptedException e) { throw new RuntimeException (e); } catch (BrokenBarrierException e) { throw new RuntimeException (e); } }); executorService.submit(()->{ logger.info("task2 begin..." ); try { TimeUnit.SECONDS.sleep(3 ); cyclicBarrier.await(); logger.info("task2 end..." ); } catch (InterruptedException e) { throw new RuntimeException (e); } catch (BrokenBarrierException e) { throw new RuntimeException (e); } }); } executorService.shutdown(); } } # 输出三次: task1 begin... task2 begin... cyclicBarrier over... task2 end... task1 end...
并发集合类 线程安全集合类概述
遗留的安全集合: Vector HashTable
所有方法使用 synchronized 修饰 并发性能较低
修饰的安全集合:Collections.singletonList(new ArrayList<>())
装饰者模式 将线程不安全的集合作为内部的成员变量,操作时依旧使用不安全集合的方法,只是加了 synchronized 锁保证线程安全,并发性能同样较差
JUC的安全集合 :包含三大类:Blocking 类,CopyOnWrite 类,Concurrent 类
Blocking 类:阻塞式的并发集合,大多实现基于锁并提供阻塞方法,线程不满足条件时将会被阻塞。
CopyOnWrite 类:修改时采用拷贝的方式来避免多线程访问时读写的并发安全,只适用于读多写少的场景。
Concurrent 类:高效的并发集合,内部使用cas优化并提供多把锁提高并发度和吞吐量,适用于高并发的读写操作场景,
第三方工具 disruptor 高性能无锁队列
guava-RateLimiter 基于信号量的高性能限流器