JUC并发工具

并发工具

线程池

线程是一种系统资源,每个线程都会占用一定的内存资源,过多的线程可能会导致内存溢出的情况。同时,过多的线程会导致频繁的上下文切换而降低性能。因此不能过多的创建线程,在此问题的基础上,出现了线程池。

线程池是用于管理和复用线程的机制,减少过多的内存占用和线程数量导致频繁的上下文切换,以提升性能。

自定义线程池

image-20240122111214901
  • ThreadPool - 线程池:用于管理可以被复用的线程
  • BlockingQueue - 阻塞队列:用于平衡生产者生产和消费者消费速度差异
    • 消费者:相当于线程池中的线程,获取阻塞队列中的任务并执行
    • 生产者:相当于请求或者其他线程产生任务
    • 当消费者线程消费速度高于生产者线程生产时,线程池中线程需要等待 阻塞队列
    • 当生产者线程生产速度高于消费者线程消费,阻塞队列用于存储来不及消费的任务

简单时间一个最大线程数和线程工厂的自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
public class SimpleTest {

private static final Logger logger = LoggerFactory.getLogger(PoolTest.class);

public static void main(String[] args) {

MyThreadPool myThreadPool = new MyThreadPool(2, 5, TimeUnit.SECONDS, 5,
(task) -> ((MyThreadPool.Worker) task).getTarget().run());
List<Runnable> runnables = new ArrayList<>(100);
for (int i = 1; i <= 100; i++) {
final int a = i;
runnables.add(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("task" + a);
});
}
for (Runnable r : runnables) {
myThreadPool.exec(r);
}
}
}

/**
* 拒绝策略
* @param <T>
*/
@FunctionalInterface
interface RejectedHandler<T> {

void reject(T r);
}

class MyThreadPool {

private static final Logger logger = LoggerFactory.getLogger(MyThreadPool.class);
// 任务队列
private MyBlockingQueue<Worker> myBlockingQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 线程等待任务超时时间
private long timeout;

private ReentrantLock workerLock = new ReentrantLock();

private TimeUnit timeUnit;

// 策略模式 抽象成接口的抽象方法 具体实现交给调用者实现
private RejectedHandler rejectedHandler;

public MyThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectedHandler rejectedHandler) {
myBlockingQueue = new MyBlockingQueue<>(queueCapacity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectedHandler = rejectedHandler;
}

public void exec(Runnable runnable) {
workerLock.lock();
try {
// 当前线程的线程数小于核心线程数 - 创建线程执行任务
if (workers.size() < coreSize) {
Worker worker = new Worker(runnable);
worker.start();
logger.info("add worker:{} pre task:{}", worker, runnable);
workers.add(worker);
} else {
// 当前线程的线程数等于核心线程数 - 任务进入阻塞队列
logger.info("task put queue:{} current queue size:{}", runnable, myBlockingQueue.size());
myBlockingQueue.offer(new Worker(runnable), rejectedHandler);
}
} finally {
workerLock.unlock();
}
}

public class Worker extends Thread {

private Runnable target;

public Worker(Runnable target) {
this.target = target;
}

public Runnable getTarget() {
return target;
}

@Override
public void run() {
while (target != null) {
// 当前线程任务不为空时
logger.info("exec task:{}", target);
target.run();
// 继续从阻塞队列中获取任务并执行
target = myBlockingQueue.poll(timeout, timeUnit);
}
workerLock.lock();
try {
logger.info("remove worker:{}", this);
workers.remove(this);
} finally {
workerLock.unlock();
}
}

}
}

class MyBlockingQueue<T> {

// 阻塞队列
private Deque<T> deque = new ArrayDeque<T>();

private volatile int capacity = 10;

// 加锁保证阻塞队列共享变量的线程安全
private ReentrantLock lock = new ReentrantLock();

// 消费者条件变量,当消费者在阻塞队列中没有变量时,消费者需要阻塞等待
private Condition consumer_condition = lock.newCondition();


public MyBlockingQueue(int capacity) {
this.capacity = capacity;
}

public int size() {
return deque.size();
}

/**
* 消费者拉取消息
*
* @param timeout 超时时间
* @param unit
* @return
*/
public T poll(final long timeout, final TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (deque.isEmpty()) {
try {
//设置等待条件超时时间
if (nanos <= 0) {
return null;
}
// 重置等待实际为剩余时间 防止虚假唤醒重置等待时间
nanos = consumer_condition.awaitNanos(nanos);

} catch (InterruptedException e) {
e.printStackTrace();
}
}

T pop = deque.pop();
return pop;
} finally {
lock.unlock();
}
}

/**
* 生产者投递消息
*
* @param t
* @param rejectedHandler
* @return
*/
public void offer(T t, RejectedHandler rejectedHandler) {
lock.lock();
try {
if (deque.size() >= capacity) {
// 阻塞队列满了执行拒绝策略
rejectedHandler.reject(t);
} else {
// 阻塞队列添加任务
deque.offer(t);
consumer_condition.signalAll();
}
} finally {
lock.unlock();
}
}
}

ThreadPoolExecutor

ThreadPoolExecutor是jdk提供的线程池的实现

image-20240122154814432
  • Executor:是一个执行器接口,提供了提交的 Runnable 任务的对象
  • ExecutorService:是执行器服务接口,是线程池最基础的接口,扩展了 Executor 并用于管理任务和执行器本身的生命周期的附加方法。
    • 引入了 submit(Callable task) 方法,返回挂起结果 Future
  • AbstractExecutorService:是抽象执行器服务,实现了Executor的基本方法
  • ThreadPoolExecutor:ThreadPoolExecutor 是 ExecutorService 接口的具体实现,提供了一个灵活可配置的线程池实现,开发者可以使用 ThreadPoolExecutor 实例来管理和执行任务。
  • ScheduledExecutorService:ScheduledExecutorService在ExecutorService线程池基础接口的基础上提供了定时或延迟执行的抽象方法
  • ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor 是 ScheduledExecutorService 的具体实现。
线程池状态

ThreadPoolExecutor使用原子整数 ctl - control state 存储线程池的状态,ctl 包含了两个部分:高3位表示 线程池的运行状态,低29位表示 工作线程的数量,这样设计的好处是,保证了操作状态和工作线程数量两个值通过一次CAS操作同时进行赋值,实现了高效的状态管理。

1
2
3
4
5
6
7
8
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int ctlOf(int rs, int wc) { return rs | wc; }
状态名 高3位 说明 是否接受任务 是否处理阻塞队列任务
RUNNING 111 线程池处于运行状态(初始状态) Y Y
SHUTDOWN 000 线程池处于关闭状态,不再接受新的任务,但会继续执行已经提交的任务即阻塞队列的任务。 N Y
STOP 001 线程池处于停止状态,不再接受新的任务,也不会执行阻塞队列中的任务,并中断正在执行的任务。 N N
TIDYING 010 所有的任务都已经终止,工作线程数量为0,线程池即将转换到 TERMINATED 状态。
TERMINATED 011 线程池终止,不再处于任何活动状态。
构造方法
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler rejectExecutionHandler);
  • corePoolSize:核心线程数 - 线程池的基本大小,即在没有任务需要执行时,线程池的大小始终保持在这个数量(注意刚开始是0,用到才会创建)。即使线程是空闲的,它们也不会被销毁,除非设置了 allowCoreThreadTimeOut
  • maximumPoolSize:最大线程数 - 线程池允许创建的最大线程数。
    • 如果在阻塞队列中有任务等待执行,且已有的线程数小于 corePoolSize,则会创建新的线程作为 核心线程。
    • 如果已有的线程数等于 corePoolSize,新的任务会加入阻塞队列中等待执行。
    • 当阻塞队列已满,有新的任务的来时,创建新的线程作为救急线程直到线程数达到maximumPoolSize,这些救急线程会在不需要时被销毁。
    • 当阻塞队列已满,线程数已经达到 maximumPoolSize,新的任务无法执行,会触发拒绝策略。
  • keepAliveTime:线程空闲时间 - 当线程池中的线程数超过 corePoolSize 时,多余的空闲救急线程在被终止之前等待新任务的最长时间即从线程空闲开始算起的最大空闲时间。
  • unit:时间单位 - 用于指定 keepAliveTime 参数的时间单位。
  • workQueue:阻塞队列 - 用于保存等待执行的任务的阻塞队列。可以使用不同的阻塞队列实现。
  • threadFactory:线程工厂 - 用于创建新线程的工厂。可以通过自定义线程工厂来指定新线程的属性,如线程名称、优先级等。
  • rejectExecutionHandler:拒绝策略 - 当任务添加到线程池中被拒绝时的处理策略。

注意:只有当阻塞队列为有容量限制的队列时,才会在超过其队列容量大小时,创建最多 maxnumPoolSize - corePoolSize 的数据线程来救急。

拒绝策略
image-20240122210258386

自定义线程池的拒绝策略是指当线程池的任务队列已满并且最大线程数被耗尽的情况下,线程池应对任务处理的一种策略。

  • AbortPolicy(默认策略):会直接抛出RejectedExecutionException异常,阻止新任务的提交。
  • CallerRunsPolicy:当线程池无法接受新任务时,会将任务交给调用者线程来执行。
    • 保证了任务一定会被执行,但是可能会导致调用者线程的负担增加
  • AbortPolicy:丢弃无法处理的任务,不提供任何反馈。
  • DiscardPolicy:丢弃阻塞队列等待时间最长的任务,然后尝试将新任务加入队列。

其他框架对于拒绝策略有很多不同的扩展:

dubbo 框架有拒绝策略实现 抛出异常并记录日志,并且dump线程栈帧的信息,方便问题的定位

netty 框架有拒绝策略实现 创建一个新的线程来执行任务

activemq 框架有拒绝策略实现 设置60秒超时等待时间 期间不断尝试将任务放入到队列中

常用方法
提交任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 执行任务
void execute(Runnable command);

// 提交任务task 使用Future获取返回结果
// 利用synchronized和wait/notifyAll保护性暂停模式 调用future get方法的线程阻塞等待,直到执行callable的线程执行完毕后唤醒它
<T> Future<T> submit(Callable<T> task);

// 执行给定的任务集合,并返回代表这些任务的Future对象的列表
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 在上述方法的基础上,设置超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 用于执行给定的任务集合,并返回最先执行结束任务的执行结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 用于执行给定的任务集合,并返回最先执行结束任务的执行结果,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
关闭线程池
  • shutdown

    调用 shutdown 方法将线程池的状态更改为 SHUTDOWN,不会接受新的任务,但会将已提交的任务和在阻塞队列中的任务执行完毕。但是shutdown 方法不会阻塞调用者线程的执行,调用者线程会继续执行,而不会等待线程池中的任务执行完毕。

    shutdown 方法是一个优雅关闭线程池的方式,让线程池在执行完已提交的任务后安全地终止。

    1
    void shutdown();
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public void shutdown() {
    ReentrantLock mainLock = this.mainLock;
    mainLock.lock();

    try {
    // 检查是否有权限关闭线程池
    this.checkShutdownAccess();
    // 更新线程池的状态
    this.advanceRunState(SHUTDOWN);
    // 打断空闲线程
    this.interruptIdleWorkers();
    // 在线程池关闭时执行一些自定义的关闭操作,这是一个可扩展的钩子方法
    this.onShutdown();
    } finally {
    mainLock.unlock();
    }

    // 尝试终结 - 没有运行的线程则直接终结,如果还有线程则让他们自己自己结束,也不会阻塞等待
    this.tryTerminate();
    }
  • shutdownNow

    调用 shutdownNow 方法将线程池的状态更改为 STOP,不会接受新的任务的同时,也会将尝试中断所有工作线程,包括正在执行任务的线程,正在执行任务的线程使用 interrupt 打断,当前阻塞队列中的任务会作为方法的返回结果。

    1
    List<Runnable> shutdownNow();
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public List<Runnable> shutdownNow() {
    ReentrantLock mainLock = this.mainLock;
    mainLock.lock();

    List tasks;
    try {
    this.checkShutdownAccess();
    this.advanceRunState(STOP);
    // 中断所有工作线程
    this.interruptWorkers();
    // 获取阻塞队列中的任务进行返回
    tasks = this.drainQueue();
    } finally {
    mainLock.unlock();
    }

    // 尝试终结 - 由于之前的工作线程全部被打断,所以这里所有的线程都会被终结
    this.tryTerminate();
    return tasks;
    }

Executors

Executors 是 基于ThreadPoolExecutor构造 的一个工厂类,提供了很多方法来创建不同用途的线程池。

配置项相对于ThreadPoolExecutor较少,但是某些场景的简化,可能无法满足特定的需求。

  • newFixedThreadPool

    newFixedThreadPool 创建一个固定大小的线程池,当有新的任务提交时,如果线程池中的线程数小于nThreads,则创建新线程来处理任务,否则将任务加入队列等待执行,最多为 nThreads 个核心线程数,没有救急线程。

    注意:该构造方法的阻塞队列采用了 LinkedBlockingQueue ,属于无界队列。

    使用场景:适用于需要控制并发线程数量或者执行固定数量的任务的场景

    1
    2
    3
    4
    5
    6
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
  • newCachedThreadPool

    newCachedThreadPool可缓存的线程池,该线程池没有核心线程,最大线程数为 Integer.MAX_VALUE,线程池中的线程数量根据任务的数量动态调整,这些线程都是救急线程,超过60秒会被回收。当有新任务提交时,如果有空闲线程,则重用现有线程,否则创建新线程。

    该构造方法的阻塞队列采用了 SynchronousQueue 特殊类型的队列,内部并不存储元素,而是将任务直接交给等待的线程。它的容量为0,意味着每个插入操作必须等待另一个线程的对应移除操作,它这种特性是专门应用于 newCachedThreadPool 的。

    使用场景: 适用于任务数量不确定但每个任务执行时间较短、可能有爆发性增长的场景,自动调整线程数量。

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  • newSingleThreadExecutor

    newSingleThreadExecutor 创建一个只有单个线程的线程池。所有提交的任务将按顺序执行,并且保证不会有并发执行的情况。

    该构造方法的阻塞队列采用 LinkedBlockingQueue 链式无解队列。

    这种只有一个线程的线程池和一个线程串行执行的区别在于:线程池有拒绝策略进行补偿,同时线程池会维护一个线程并保证其的政策工作(即使有线程发生意外终止了,线程池会创建一个新的线程来顶替,保证线程池中始终有且只有一个可用的线程)。

    使用场景: 顺序执行的任务

    注意:newSingleThreadExecutor方法底层通过FinalizableDelegatedExecutorService装饰,只暴露了 ExecutorService 接口,不能调用 ThreadPoolExecutor 特有的方法,例如 setCorePoolSize 等方法。

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

异步模式之工作线程

线程池让有限的工作线程(worker thread)来轮流地异步处理无限多的任务,采用了工作线程的模式,也有分工模式和享元模式的思想在里面。

在实际的应用场景中,由于不同类型的任务可能有不同的执行特性和优先级,因此不同的任务类型应该使用不同的线程池,这样可以避免饥饿现象的同时,也可以提升运行效率。

饥饿现象

线程池的饥饿现象是指由于线程不足,导致很多任务无法继续执行的现象。在这种情况下,这些任务可能会一直等待,无法得到执行的机会,因为线程池中的线程资源已经被占满,无法满足所有任务的并发执行需求。

线程池推荐配置

过小的线程数不能充分利用多核CPU的系统资源,而且会导致饥饿问题的产生;过多的线程数会导致频繁的线程上下文的切换,占用更多的内存。

CPU密集型任务

CPU密集型任务指的是 任务执行过程中需要大量的CPU计算资源,这种情况下,可以使用 固定大小的线程池,这样可以确保CPU的资源高效的利用,不会出现频繁的上下文切换而带来的性能上的开销。固定大小的线程池一般采用 cpu核数 + 1 (+1 目的是 保证当有线程由于页缺失故障或其他原因导致暂停,额外的线程可以顶上,保证CPU时钟周期不被浪费)。

IO密集型任务

IO密集型任务指的是 任务执行过程中会涉及到大量的I0操作(如读写文件、网络通信等),IO操作会让线程等待,不会占用CPU资源,此时可以提高线程数来提升CPU的利用率。

1
线程数 = CPU核心数 * 期望CPU利用率 * 总时间(CPU计算时间 + IO等待时间)/ CPU计算时间

任务调度线程池

在任务调度线程池加入之前,jdk提供了 java.util.timer来实现定时功能,但timer的缺点在于所有的任务都是由同一个线程来调度执行,因此所有的任务都是串行执行,在单线程串行处理的情况下,同一时间只有一个任务可以被执行,无法并行,前一个任务的延迟会导致后一个任务的执行时间延迟,前一个任务的抛出的异常会导致线程消失,而是先回收异常线程,再创建一个新的线程。

引入 任务调度线程池 ScheduledThreadPoolExecutor ,它可以延迟执行任务,也可以定时执行任务,通过设置核心线程数可以防止任务延迟,同时及时执行异常时,线程也不会消失,而是先回收异常线程,再创建一个新的线程。

1
2
3
4
5
6
7
8
// 使用  Executors 快速创建 ScheduledExecutorService
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
// 延迟执行
scheduledExecutorService.schedule(Runnable command, long delay, TimeUnit unit);
// 固定时间间隔执行[如果任务的执行时间超过了设定的延迟时间,下一个任务会在上一个任务完成后立即启动]
scheduledExecutorService.scheduleWithFixedDelay(Runnable command, long initialDelay,long delay, TimeUnit unit);
// 固定频率执行[如果任务执行时间超过了设定的频率,下一个任务可能会立即启动,不等待上一个任务的完成]
scheduledExecutorService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
任务调度应用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 每周五 18点定时执行任务
*/
public static void startTaskEveryFriday() {

LocalDateTime now = LocalDateTime.now();
LocalDateTime time = now.with(DayOfWeek.FRIDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
if (time.isBefore(now)) {
// 获取下一个周五
time = time.plusWeeks(1);
}

// 计算时间差值
long initialDelay = Duration.between(now, time).toMillis();
logger.info("time:{}, initialDelay:{}", time, initialDelay);

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("hello world!");
}, initialDelay, 1000 * 60 * 60 * 24 * 7, TimeUnit.MILLISECONDS);
}

线程池的异常处理

线程池的异常不会导致线程池中的线程数量减少,线程池会回收掉异常线程,并创建新的线程补充,因此其他的线程不会收到影响。

当调用 execute(Runnable r) 方法时,异常会信息会打印在控制台,当调用 submit(Callable c) 方法时,异常信息不会被打印在控制台,而是封装到返回结果 Future 中,使用 future.get 方法可以获取异常并捕获。

Tomcat线程池

tomcat 核心组成部分主要包括 connector(连接器)和 container(容器)。connector 负责和客户端连接通信;container 负责管理和执行 Servlet。其中 connector 连接器部分使用了 线程池。connector 的架构如下图:

Tomcat架构
image-20240123161842550
  • LimitLatch:基于semaphore实现的限流,可以控制最大的连接个数
  • Acceptor:不断执行负责接收新的socket连接
  • Poller:负责监听 socket channel 是否有 可读的I/O事件,一旦有可读的事件,会封装备为一个任务对象提交给 Executor 线程池执行
  • Executor 线程池负责将 处理请求
Tomcat Executor线程池流程

如果任务数量超过核心线程数,但没有超过最大线程数,线程池会创建新的线程来处理额外的任务(不会超过核心线程数,这些线程在任务完成后可能会被保留一段时间以备重用,或者在空闲一定时间后被销毁),如果任务数量超过核心线程数和最大线程数的总和,多余的任务才会被放入任务队列中。

Tomcat Executor线程池扩展

Tomcat线程池对于jdk提供的ThreadPoolExecutor线程池进行了扩展:

当总任务到达 maximumPoolSize 并且阻塞队列也满了的时候,它不会抛出 RejectedExecutionException 异常,而是再次尝试将任务放入队列中,如果再不成功则抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# org.apache.catalina.core.StandardThreadExecutor    
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
// 调用 super 方法执行任务
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 再次尝试将任务放入队列中,如果再不成功则抛出异常
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
Tomcat Executor配置
配置项 默认值 说明
threadPriority 5 线程优先级
daemon true 是否为守护线程
minSpareThreads 25 核心线程数 相当于 corePoolSize
maxThreads 200 最大线程数 相当于 maximumPoolSize
maxIdleTime 60000 线程生存时间,单位是毫秒,默认值即 1 分钟
maxQueueSize Integer.MAX_VALUE 队列长度
prestartminSpareThreads false 核心线程的创建方式:false - 懒加载 true - 预加载

Fork/Join线程池

Fork/Join 线程池是 一种并行处理的线程池,使用了分治的思想,加入多线程,将每个任务进行拆分交给不同的线程来处理,进一步提升运行效率。

分而治之的思想是指:将问题分解成更小子问题并并行处理的思想。

适用于可以进行任务拆分的cpu密集型运算。

Fork/Join 线程池默认创建与cpu核心数大小相同的线程,这是由于需要拆分的任务大多是cpu密集型任务,线程数和cpu核心数相同即可。

使用
创建任务对象

任务类继承抽象类 RecursiveTask - 有返回结果 或者 RecursiveAction - 无返回结果,重写抽象方法 compute

1
2
3
4
5
6
7
8
9
10
class MyTask extends RecursiveTask<Integer> {
@Override
protected Integer compute() {
...
}
}

class MyTask extends RecursiveAction {
...
}
创建线程池
1
2
3
4
// 创建线程池 默认线程数 = cpu核心数 Runtime.getRuntime().availableProcessors()
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 执行任务
forkJoinPool.invoke(ForkJoinTask<T> task);
实例

模拟 n + … + 5 + 4 + 3 + 2 + 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ForkJoinDemo {

public static void main(String[] args) {
// 创建线程池 默认线程数 = cpu核心数 Runtime.getRuntime().availableProcessors()
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 执行任务
Integer invoke = forkJoinPool.invoke(new MyTask(5));
System.out.println(invoke);
}
}

/**
* n + ... + 5 + 4 + 3 + 2 + 1
* MyTask(5) = MyTask(4) + 5
* MyTask(4) = MyTask(3) + 4
* MyTask(3) = MyTask(2) + 3
* ....
*/
class MyTask extends RecursiveTask<Integer> {

private static final Logger logger = LoggerFactory.getLogger(MyTask.class);

int n;

public MyTask(int n) {
this.n = n;
}

@Override
protected Integer compute() {
// 终止条件
if (n == 1) {
return 1;
}
// MyTask(n) = n + MyTask(n-1)
MyTask myTask = new MyTask(n - 1);
// 让一个线程执行此任务 MyTask(n-1)
myTask.fork();
// 获取任务结果 MyTask(n-1)
Integer t = myTask.join();
int result = n + t;
logger.info("join {} + {} = {}", t, n, result);

return result;
}
}
# 日志输出:
21:02:26 [ForkJoinPool-1-worker-1] com.example.demo.threadpool.MyTask - join 1 + 2 = 3
21:02:26 [ForkJoinPool-1-worker-2] com.example.demo.threadpool.MyTask - join 3 + 3 = 6
21:02:26 [ForkJoinPool-1-worker-1] com.example.demo.threadpool.MyTask - join 6 + 4 = 10
21:02:26 [ForkJoinPool-1-worker-1] com.example.demo.threadpool.MyTask - join 10 + 5 = 15
15
优化

上面的方法 发现任务之间都是相互依赖 MyTask(5) 依赖 MyTask(4) 的结果,这并不能发挥分代并行的优势,基于此改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* n + (n-1) + .. + 2 + 1
* => 1 ~ mid mid + 1 ~ n
*/
class MyProTask extends RecursiveTask<Integer> {

private static final Logger logger = LoggerFactory.getLogger(MyProTask.class);

private int begin;

private int end;

public MyProTask(int begin, int end) {
this.begin = begin;
this.end = end;
}

@Override
protected Integer compute() {

if (begin == end) {
return begin;
}
if (end - begin == 1) {
return end + begin;
}

int mid = (end + begin) / 2;
MyProTask t1 = new MyProTask(begin, mid);
MyProTask t2 = new MyProTask(mid + 1, end);
t1.fork();
t2.fork();


Integer join1 = t1.join();
Integer join2 = t2.join();
int result = join1 + join2;
logger.info("t1 {} t2 {} result {}", join1, join2, result);
return result;
}
}

总结

Fork/Join 线程池 的重点在于 任务的拆分,尽量保证发挥分代并行的优势

JUC

AQS

JUC包下的很多工具都是依赖于AQS

AQS - AbstractQueuedSynchronized 翻译则是 抽象的基于队列的同步器,是阻塞式锁和相关同步器工具的基类。

AQS 的核心思想是使用一个状态变量来表示共享资源的状态,并通过原子性的状态修改和线程阻塞/唤醒等操作来实现对共享资源的安全访问。

特点

AQS 的主要特点如下:

  • state - 状态:表示 共享资源的状态。它是AQS维护的一个原子性的状态变量,状态可以被多个线程共享,通过 CAS(Compare and Swap)等原子操作进行修改。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
     // aqs 内存整数值
    private volatile int state;

    // stateOffset 用于存储 state 字段在内存中偏移量 用于 UNSAFE 类的修改和读取操作
    private static final long stateOffset;

    static {
    try {
    stateOffset = unsafe.objectFieldOffset
    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
    } catch (Exception ex) { throw new Error(ex); }
    }

    /**
    * 使用 乐观锁的 CAS 原子性的将 state 值从 expect 更新到 update
    */
    protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    state 支持两种模式:共享模式 - shared mode 和 独占模式 - execlusive mode

    • 共享模式:在独占模式下,同一时刻只有一个线程能够获得锁,其他线程必须等待。
      • state 值为 0 时表示没有线程持有锁,大于0表示有线程持有锁
      • 常见应用:ReentrantLock
    • 独占模式:在共享模式下,允许多个线程访问共享资源
      • state的值为计数器,表示被共享资源的线程数量
      • 常见应用:Semaphore

    子类需要定义和维护这个状态,实现不同模式下的应用

  • queue - 等待队列:AQS 使用一个先进先出(FIFO)的等待队【通过节点(Node)来实现的一个双向链表(Double-Linked Queue)】来管理阻塞的线程。当线程请求访问资源但无法获得时,会被加入到等待队列中等待。

  • condition - 条件变量: AQS 支持条件变量,允许线程按照特定的条件等待或唤醒,实现了更加灵活的线程通信。

  • exclusiveOwnerThread - 独占锁的线程引用,表示当前锁持有独占锁的线程。

使用

开发者可以创建自定义的同步器继承 AQS 并实现其抽象方法,从而构建更复杂的同步工具。

继承AQS需要实现下面的模版方法:

  • tryAcquire:用于尝试获取独占锁
  • tryRelease:用于尝试释放独占锁
  • tryAcquireShard:用于尝试释放独占锁
  • tryReleaseShard:用于尝试释放共享锁
  • isHeldExclusively:用于判断当前线程是否持有独占锁

简单实现一个不可重入的独占锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public class AqsTest {

private static final Logger logger = LoggerFactory.getLogger(AqsTest.class);

public static void main(String[] args) {
MyLock myLock = new MyLock();
new Thread(() -> {
myLock.lock();
logger.info("t1 lock...");
try {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

} finally {
logger.info("t1 unlock...");
myLock.unlock();
}
}).start();

new Thread(() -> {
myLock.lock();
logger.info("t2 lock...");
try {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

} finally {
logger.info("t2 unlock...");
myLock.unlock();
}
}).start();

}
}

/**
* 实现不可重入的独占锁
*/
class MyLock implements Lock {

private MySnyc snyc = new MySnyc();

/**
* 同步器类
*/
class MySnyc extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
// cas 尝试获取独占锁 state 0 无锁 1 已经线程占有锁
if (compareAndSetState(0, 1)) {
// 加锁成功 设置当前持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 进入阻塞队列等待
return false;
}

@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0); // volatile 写屏障
return true;
}

@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

private Condition newCondition() {
return new ConditionObject();
}
}

/**
* 加锁 获取失败则线程阻塞进入等待队列
*/
@Override
public void lock() {
// 内部调用了 tryAcquire 方法
snyc.acquire(1);
}

/**
* 加锁 可被打断
*
* @throws InterruptedException
*/
@Override
public void lockInterruptibly() throws InterruptedException {
snyc.acquireInterruptibly(1);
}

/**
* 尝试加锁 尝试加锁一次 失败不会阻塞线程 跳出获取锁
*
* @return
*/
@Override
public boolean tryLock() {
return snyc.tryAcquire(1);
}

/**
* 尝试加锁一次 并设置超时时间
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return snyc.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
snyc.release(1);
}

@Override
public Condition newCondition() {
return snyc.newCondition();
}
}

Lock

ReentrantLock实现原理
image-20240124152215431

ReentrantLock内部维护Sync同步器,同步器继承自 aqs,针对Sync抽象同步器,内部提供了两种不同实现:NonfairSync 和 FairSync,分别对应了非公平锁和公平锁。

非公平锁实现

从无参构造器来看,默认使用的非公平锁的同步器实现

1
2
3
4
 		# reentrantlock.java	
public ReentrantLock() {
sync = new NonfairSync();
}
加锁成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  # aqs.java
public void lock() {
sync.lock();
}

# reentrantlock.java
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
// cas尝试更改 aqs state 判断当前线程获取锁是否成功
if (compareAndSetState(0, 1))
// 如果获取锁成功,则修改同步器的独占线程为 当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 获取锁失败
acquire(1);
}

// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
image-20240124154216822
加锁失败

上面加锁失败后,执行了同步器默认实现的 acquire 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  public final void acquire(int arg) {
// 再次尝试获取锁 失败则执行 逻辑与 后面的
if (!tryAcquire(arg) &&
// 创建一个等待对象并将其加入到等待队列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 线程等待
selfInterrupt();
}

// 在将当前线程加入到等待队列后,使线程进入自旋状态等待获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋尝试获取锁
for (;;) {
final Node p = node.predecessor();
// 当 当前节点的前驱节点为 head (避免无效竞争 - 只有头部线程有资格去尝试获取锁)并且 尝试获取锁成功时,移除队列
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果无法成功获取锁,检查是否需要将当前线程阻塞(park)
// shouldParkAfterFailedAcquire 会将 前驱节点 waitStatus 改为 -1 (-1表示节点处于等待唤醒状态,它有责任唤醒它的下一个节点)
if (shouldParkAfterFailedAcquire(p, node) &&
// 调用 LockSupport.park(this) 阻塞当前线程
parkAndCheckInterrupt())
// 修改中断标识
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}


image-20240124154930816

再次尝试获取独占锁失败后,会执行逻辑与之后的逻辑,将阻塞线程添加到队列里

  • addWriter方法

AbstractQueuedSynchronizer 的队列通过节点(Node)实现的一个双向链表(Double-Linked Queue),它是由一个虚拟的头结点(head)和一个虚拟的尾结点(tail)组成的。

尝试添加一个节点,首次添加节点时,AQS会创建一个虚拟头结点(head),并将这个头结点作为队列的头部。然后再创建一个表示当前等待线程的真正节点,该节点是独占模式的等待节点,将它添加到队列尾部中,并设置它为尾结点(tail)。虚拟头节点的作用在于高效地进行队列的入队和出队操作。

image-20240124160058839
  • acquireQueued 方法

在将当前线程加入到等待队列后,使线程进入自旋状态等待获取锁,自旋获取锁失败则被 park

image-20240124172339160
释放锁
image-20240125085757914
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    # ReentrantLock.java
public void unlock() {
sync.release(1);
}

# AbstractQueuedSynchonizer.java
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 当前队列不为空,并且 waitStatus 为-1 时 进入unparkSuccessor流程
if (h != null && h.waitStatus != 0)
// unpark 唤醒后继节点 让后继节点的所属线程从被park的地方(即自旋的地方)继续执行
unparkSuccessor(h);
return true;
}
return false;
}

# ReentrantLock#Sync.java
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

ReentranctLock 执行 unlock 方法时,会调用 同步器的tryRelease 的方法

方法内部调用 tryRelease 方法,设置 当前同步器的状态为 0 并且设置 exclusiveOwnerThread 为 null 表示当前锁没有被占有

当前队列不为空,并且 waitStatus 为-1 时 进入unparkSuccessor流程

在 unpark 唤醒头节点的后继节点,让后继节点的所属线程从被park的地方(即自旋的地方)继续执行

  • 当tryAcquire(arg)返回 true 获取锁成功时,同步器的exclusiveOwnerThread为 被唤醒的线程,并设置 state 为1

    • 阻塞队列的变化:会将当前节点设置为新的头部节点,断开当前头部节点与下一个节点的连接以便垃圾回收,并标记竞争失败的标志为false

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      final Node p = node.predecessor();
      // 当 当前节点的前驱节点为 head (避免无效竞争 - 只有头部线程有资格去尝试获取锁)并且 尝试获取锁成功时,移除队列
      if (p == head && tryAcquire(arg)) {
      // 将被唤醒的节点设置为新的头部节点
      setHead(node);
      // 之前的头节点p的 后继节点设置为null 有助于将之前的头部节点p被垃圾回收点
      p.next = null; // help GC
      failed = false;
      return interrupted;
      }

      private void setHead(Node node) {
      // 将被唤醒的节点设置为新的头部节点
      head = node;
      // 设置当前节点的线程引用为空 前继节点指向为 null
      node.thread = null;
      node.prev = null;
      }
    image-20240125090804035
  • 由于为非公平锁的实现,因此当此时有其他线程在锁没有占有的情况下,也尝试获取了锁时,它也可能获取到锁(exclusiveOnwerThread 设置为它,并且 state = 1 ),当被唤醒的线程自旋过程竞争锁失败时,再次被park。

    image-20240125090401261
可重入锁原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 尝试获取锁
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

abstract static class Sync extends AbstractQueuedSynchronizer {

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 首次尝试获取锁 将 state 从 0 修改为 1
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当 exclusiveOwner 为当前线程时, state++
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
/
setState(nextc);
return true;
}
return false;
}

// 重入锁释放 state--
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 只是可重入次数减1 而不是释放锁
boolean free = false;
// 只有state 为0时,才释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
可打断原理

reentrantlock 分为两种模式:可打断模式 和 不可打断模式

1
2
3
4
5
6
7
8
9
  // 不可打断模式
public void lock() {
sync.lock();
}

// 可打断模式
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
不可打断模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋尝试获取锁
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// park 线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private final boolean parkAndCheckInterrupt() {
// locksupport.park 可以被 thread.interrupt() 中断继续执行
LockSupport.park(this);
// 返回中断标识并清除中断标识,以便再次被 park 住
return Thread.interrupted();
}

public static boolean interrupted() {
// 返回当前线程的中断状态,并清除中断标志
return currentThread().isInterrupted(true);
}

loksupport.park 可以被 thread.interrupt() 打断继续执行,但是继续执行后,它还是在自旋获取锁的代码中,重新被park掉(park被调用interrupt线程无法再次被park掉,需要调用Thread.interrupted()返回返回中断标识并会清除中断标识,以便再次被 park 住),因此,即使有线程调用打断被阻塞的线程,线程被唤醒还是在自旋获取锁的代码中执行,需要获取锁以后,才跳出循环,打断后正常继续执行。

1
2
3
4
5
6
7
8
9
10
11
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 当获取锁成功时 在这里进行一次打断
selfInterrupt();
}

// 调用 thread.interrupt() 打断
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
可打断模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 没有获取到锁
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// park 线程
parkAndCheckInterrupt())
// 被interrupt打断后抛出异常,跳出for(;;)循环,放弃等待获取锁
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
公平性原理
非公平锁原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class NonfairSync extends Sync {

protected final boolean tryAcquire(int acquires) {
// 调用父类 Sync 中的 nonfairTryAcquire 方法
return nonfairTryAcquire(acquires);
}
}

abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前锁的占用标识为 0 时即没有线程持有线程时
if (c == 0) {
// 直接cas尝试获取获取锁,不会进入阻塞队列等待
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# ReentrantLock.java
static final class FairSync extends Sync {

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前锁的占用标识为 0 时即没有线程持有线程时
if (c == 0) {
// 调用 hasQueuedPredecessors 查看队列是否有优先级更高的线程等待执行
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

# aqs.java
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 头节点和尾节点不相等 说明队列中还有节点
return h != t &&
// 队列中的头节点的下一个节点不为空 并且 该节点的线程不是当前线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
条件变量原理

ReentrantLock 条件变量对应了一个 ConditionObject 对象,每个 ConditionObject 对象维护了一个 双向链表,保存着等待条件阻塞的线程。

await
image-20240125104133386

当 thread-0 持有锁时,调用 condition.await 方法,将创建一个状态为2(Node.Condition表示线程在等待条件)关联线程的节点加入到 conditionObject 实例对象的双向链表的尾部中,并且尝试释放重入锁和清空重试次数,设置 exclusiveOwnerThread 为 null,并唤醒阻塞队列的后继节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ConditionObject 是内部类 实现了 Condition 接口
public class ConditionObject implements Condition, java.io.Serializable {

private transient Node firstWaiter;

private transient Node lastWaiter;

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将线程加入到 ConditionObject 实例的双向链表中
Node node = addConditionWaiter();
// 完成释放重入锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// park 线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个状态为2(Node.Condition表示线程在等待条件)关联线程的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}


final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前线程持有锁的重入次数
int savedState = getState();
// 尝试释放重入锁和清空重试次数
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

public final boolean release(int arg) {
// 尝试释放重入锁和清空重试次数
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒阻塞队列的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
}
signal

调用 thread-1 调用 signal 方法时,将 conditionObject 实例中的双向链表中的第一个节点移出来,改变节点的状态从 Node.CONDITION-2 转换为 0 并将该线程节点转移到等待锁的等待队列中,当 thread-1线程执行完毕后释放锁,会唤醒等待队列中的下一个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ConditionObject 是内部类 实现了 Condition 接口
public class ConditionObject implements Condition, java.io.Serializable {

private transient Node firstWaiter;

private transient Node lastWaiter;

public final void signal() {
// 当前线程是否当前锁的持有者 只有持有锁的线程才能唤醒其他线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取队列的第一个元素
Node first = firstWaiter;
// 当队列的第一个元素不为空时,
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
// firstWaiter = first.nextWaiter 将头节点指向 first 节点的下一个节点,将 first 节点从队列中移除
// 当第一个节点的后继节点为空时,则说明当前队列内部没有节点 则设置为节点为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSignal 将 该线程节点 转移到等待锁的等待队列中
} while (!transferForSignal( first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
// 改变节点的状态 从Node.CONDITION-2转换为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将 唤醒的线程节点 加入到等待队列的尾部,加入成功则返回它在队列中的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 将 迁前驱的节点的状态,修改为 Node.SIGNAL - -1 它有责任唤醒它的下一个节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
}

读写锁

ReentrantReadWriteLock

ReentrantReadWriteLock 提供了一对关联的锁,用于读和写操作,它允许多个线程同时读取资源,同时确保只有一个线程能够进行写操作即实现 “读-读并发 读-写互斥 写写互斥” 的效果。

  • 读锁:多个线程可以同时持有读锁,只要当前没有线程持有写锁。
  • 写锁:同一时间只有一个线程能够持有写锁。如果有任何线程持有写锁,则其他线程无法获取读锁或者写锁。

适用于 读多写少的场景

注意事项:

读锁不支持条件变量(condition),因为读锁是非独占的,可以有多个线程持有读锁,但是写锁是独占的,支持条件变量。

重入性 - 持有写锁的线程可以获取读锁,但是持有读锁的线程不可以获取写锁 (支持锁重入的升级 不支持锁重入的降级)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ReentrantReadWriteLockTest {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> dataContainer.read()).start();
new Thread(() -> dataContainer.write()).start();
}
}

class DataContainer {

private static final Logger logger = LoggerFactory.getLogger(DataContainer.class);

private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();


public void read() {
logger.info("获取读锁");
readLock.lock();
try {
logger.info("获取对象");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
logger.info("释放读锁");
readLock.unlock();
}
}

public void write() {
logger.info("获取写锁");
writeLock.lock();
try {
logger.info("设置对象");
} finally {
logger.info("释放写锁");
writeLock.unlock();
}
}
}
# 输出
21:29:21 [Thread-0] com.example.demo.aqs.DataContainer - 获取读锁
21:29:21 [Thread-1] com.example.demo.aqs.DataContainer - 获取写锁
21:29:21 [Thread-0] com.example.demo.aqs.DataContainer - 获取对象
21:29:22 [Thread-0] com.example.demo.aqs.DataContainer - 释放读锁
21:29:22 [Thread-1] com.example.demo.aqs.DataContainer - 设置对象
21:29:22 [Thread-1] com.example.demo.aqs.DataContainer - 释放写锁
StampedLock

在读写锁的使用过程中,底层利用AQS并使用CAS修改其状态,为了进一步优化读的性能,引入的StampedLock,它的特点是 使用它的读写锁时都需要配合一个戳使用。

  • 读锁
1
2
3
StampedLock stampedLock = new StampedLock();
long stamp = stampedLock.readLock();
stampedLock.unlockRead(stamp);
  • 写锁
1
2
3
StampedLock stampedLock = new StampedLock();
long stamp = stampedLock.writeLock();
stampedLock.unlockWrite(stamp);
  • 乐观读:StampedLock 支持 乐观读,核心方法就是 tryOptimistic 方法,该方法不会加锁,而时获取乐观素的的一个戳,在执行读操作之前,进行一次戳校验,如果戳校验失败,则说明在这期间,没有写线程获取写锁执行写操作,数据可以安全的读取,否则,需要锁升级并重新获取 读锁,保证数据安全。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    StampedLock stampedLock = new StampedLock();
// tryOptimisticRead 方法没有添加任何的锁 只是返回了一个戳
long stamp = stampedLock.tryOptimisticRead();
// 验戳 防止在获取乐观读锁和执行读操作之间,有其他线程获取写锁执行写操作
if (stampedLock.validate(stamp)) {
// 验戳一致,直接读取返回
return obj;
}
try {
// 验戳不一致即说明有写线程获取写锁执行写操作 锁升级
stampedLock.readLcok();
} finally {
stampedLock.unLockRead();
}
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class DataContainer {

private static final Logger logger = LoggerFactory.getLogger(DataContainer.class);

private StampedLock stampedLock = new StampedLock();

public void read() {
logger.info("获取乐观读锁");
long stamp = stampedLock.tryOptimisticRead();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (stampedLock.validate(stamp)) {
logger.info("验戳成功,开始读取");
return;
}
try {
stamp = stampedLock.readLock();
logger.info("验戳失败,获取读锁,执行读操作");
return;
} finally {
stampedLock.unlockRead(stamp);
}

}

public void write() {
logger.info("获取写锁");
long stamp = stampedLock.writeLock();
try {
logger.info("执行写操作");
} finally {
logger.info("释放写锁");
stampedLock.unlockWrite(stamp);
}
}
}
注意
  • stampedLock不支持条件变量 condition
  • stampedlock不支持可重入

Semaphore

Semaphore 信号量 用于限制同时访问共享资源的线程上限。

用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    // 参数 permits 共享资源同时访问的线程上限
Semaphore semaphore = new Semaphore(3);
try {
// 尝试信号量
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
//...
} finally {
// 释放信号量
semaphore.release();
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 参数 permits 共享资源同时访问的线程上限
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
logger.info("获取信号量");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
logger.info("running...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
semaphore.release();
logger.info("释放信号量");
}
}).start();
应用
  • 限流:限制访问的线程数量,让没有获取到信号量的线程阻塞,等待其他线程释放许可后,重新获取信号量并运行。这只适合限制单机线程数量。

  • 资源池管理:在数据库连接池,线程池和对象池等场景下,计数信号量可以控制对资源的并发访问,防止资源被过度占用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    class SemaphorePool {

    private static final Logger logger = LoggerFactory.getLogger(SemaphoreTest.class);

    // 连接池数量
    private final int poolSize;

    // 连接资源集合
    private final Object[] connects;

    // 连接资源占用标识数组
    private final AtomicIntegerArray statesArray;

    private final Semaphore semaphore;

    SemaphorePool(int poolSize) {
    this.poolSize = poolSize;
    this.connects = new Object[poolSize];
    this.statesArray = new AtomicIntegerArray(new int[poolSize]);
    // 信号量个数和连接池数量一致
    this.semaphore = new Semaphore(poolSize);
    }

    public Object connect() {
    try {
    // 获取许可
    semaphore.acquire();
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    try {
    for (int i = 0; i < poolSize; i++) {
    if (statesArray.get(i) == 0) {
    if (statesArray.compareAndSet(i, 0, 1)) {
    logger.info("connect:{}", connects[i]);
    return connects[i];
    }
    }
    }
    return null;
    } finally {
    semaphore.release();
    }
    }

    public void disConnect(Object conn) {
    for (int i = 0; i < poolSize; i++) {
    if (connects[i] == conn) {
    statesArray.set(i, 0);
    logger.info("disconnect:{}", connects[i]);
    semaphore.release();
    break;
    }
    }
    }

    }
semaphore原理

semaphore 底层实现是基于 AQS 实现,将 AQS 的 state 设置为 信号量的许可数。

acquire

每次acquire操作将尝试获取一个许可,而release操作将释放一个许可,当 state为正数时,表示有可用的许可,当 state 为零时,表示没有可用的许可,其他线程调用 acquire 方法会被阻塞并且进入 AQS的阻塞队列中。

image-20240127100622925 image-20240127100641239
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Semaphore implements java.io.Serializable {  
static final class NonfairSync extends Sync {
// 1. 默认构造为 非公平的
NonfairSync(int permits) {
super(permits);
}

// 5. 默认调用非公平锁实现
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 6. 获取当前 state 即 许可个数
int available = getState();
// 7. 减去获取剩余的个数
int remaining = available - acquires;
if (remaining < 0 ||
// 8. 许可个数足够则 cas 操作修改 state 至剩余值
compareAndSetState(available, remaining))
return remaining;
}
}
}
// 2. 尝试获取信号量
public void acquire() throws InterruptedException {
// 3. 调用 sync的父类aqs 的 acquireSharedInterruptibly 方法
sync.acquireSharedInterruptibly(1);
}
}

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 4. 调用 子类重写实现的 tryAcquireShared 方法即 semaphore的 sync中的tryAcquireShared
if (tryAcquireShared(arg) < 0)
// 9. 当 剩余许可不足时,将线程park并进入阻塞队列中
doAcquireSharedInterruptibly(arg);
}
}
release

image-20240127104327947

image-20240127104150801

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Semaphore implements java.io.Serializable {  

static final class NonfairSync extends Sync {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 4. state = state + release 释放许可
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 5. 使用 cas 修改
if (compareAndSetState(current, next))
return true;
}
}
}

// 1. 释放信号量
public void release() {
// 2. 调用 sync的父类aqs 的releaseShared方法
sync.releaseShared(1);
}
}

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final boolean releaseShared(int arg) {
// 3. 调用 semaphore 的 sync 子类重写的 tryReleaseShared 方法
if (tryReleaseShared(arg)) {
// 6. 唤醒 阻塞队列中的节点
doReleaseShared();
return true;
}
return false;
}
}

CountdownLatch

CountdownLatch 为 倒计时门栓,用于控制多个线程之间的同步,允许一个线程或者多个线程等待一组操作完成。

CountdownLatch 的核心概念为 计数器,在构造 CountdownLatch 需要指定一个初始化计数器,线程通过调用 await 方法阻塞等待计数器达到0,其他线程通过 调用 countdown 方法递减计数器数值,直到0后,唤醒 调用 await的线程。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CountDownLatchDemo {

private static final Logger logger = LoggerFactory.getLogger(CountDownLatchDemo.class);

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(4);
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
logger.info("countdown...");
});
}
executorService.submit(() -> {
try {
logger.info("await...");
countDownLatch.await();
logger.info("signal");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
executorService.shutdown();
}
}
# 输出
await...
countdown...
countdown...
countdown...
signal
应用

一个游戏等待10个人加载完毕,则开始游戏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CountDownLatchApp {
private static final Logger logger = LoggerFactory.getLogger(CountDownLatchDemo.class);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
String[] schedules = new String[10];
CountDownLatch countDownLatch = new CountDownLatch(10);
Random random = new Random();
for (int i = 0; i < 10; i++) {
int a = i;
executorService.submit(() -> {
for (int j = 0; j <= 100; j++) {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
schedules[a] = j + "%";
logger.info("\r" + Arrays.toString(schedules));
}
countDownLatch.countDown();
});
}
countDownLatch.await();
logger.info("游戏加载完毕");
executorService.shutdown();
}
}
# 输出:
[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
游戏加载完毕

CountdownLatch 常用于 多线程执行多个任务,等待多个任务都执行完毕,在执行下面的场景。

CyclicBarrier

CyclicBarrier 循环屏障,用于多线程环境下协调多个线程执行任务即线程协作。

​ 允许一组线程到达某个共同点之前相互等待,并在所有线程都到达这个点时同时继续执行。

CyclicBarrier 实现原理

CyclicBarrier 内部维护了一个计数器,当每个线程调用 await 方法时,当前线程被阻塞并且 CyclicBarrier 内部的计数器 count 就会减1,当计数器为0时,当所有线程都到达屏障点,屏障就会打开,所有被阻塞的线程被释放。

CyclicBarrier 循环使用

CyclicBarrier 可以被重用,在屏障打开时会自动重置计数器,等待一轮同步。

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class CyclicBarrierTest {

private static final Logger logger = LoggerFactory.getLogger(CyclicBarrierTest.class);

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
logger.info("cyclicBarrier over...");
}
});
// 重用
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
logger.info("task1 begin...");
try {
TimeUnit.SECONDS.sleep(1);
cyclicBarrier.await(); // count--
logger.info("task1 end...");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
executorService.submit(()->{
logger.info("task2 begin...");
try {
TimeUnit.SECONDS.sleep(3);
cyclicBarrier.await(); // count--
logger.info("task2 end...");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
}

executorService.shutdown();
}
}
# 输出三次:
task1 begin...
task2 begin...
cyclicBarrier over...
task2 end...
task1 end...

并发集合类

线程安全集合类概述
  • 遗留的安全集合: Vector HashTable

    • 所有方法使用 synchronized 修饰 并发性能较低
  • 修饰的安全集合:Collections.singletonList(new ArrayList<>())

    • 装饰者模式 将线程不安全的集合作为内部的成员变量,操作时依旧使用不安全集合的方法,只是加了 synchronized 锁保证线程安全,并发性能同样较差
  • JUC的安全集合:包含三大类:Blocking 类,CopyOnWrite 类,Concurrent 类

    • Blocking 类:阻塞式的并发集合,大多实现基于锁并提供阻塞方法,线程不满足条件时将会被阻塞。

    • CopyOnWrite 类:修改时采用拷贝的方式来避免多线程访问时读写的并发安全,只适用于读多写少的场景。

    • Concurrent 类:高效的并发集合,内部使用cas优化并提供多把锁提高并发度和吞吐量,适用于高并发的读写操作场景,

      • 弱一致性 :遍历时的弱一致性,读取弱的一致性等

        (注意:迭代器Iterator机制 对于 线程安全的集合遍历时进行修改,使用fail-safe机制,不会抛出异常;而对于 线程不安全的集合遍历时进行修改,使用fail-fast机制,会抛出ConcurrentModificationException异常 )

第三方工具

disruptor

高性能无锁队列

guava-RateLimiter

基于信号量的高性能限流器