JUC 并发编程

多线程的引入

进程和线程

定义

进程(process)是计算机中运行的程序一个实例。进程之间有相互独立的内存空间,每个进程有自己的地址空间、指令、数据、系统资源等。

线程(thread)是进程中的一个执行单元,一个进程中包括了多个线程,它们共享了相同的内存空间和系统资源。

现代操作系统将进程作为资源分配和保护的基本单位,将线程作为调度和执行基本单位。

内存和资源

各个进程之间拥有独立的内存空间,同一进程内的线程共享相同的内存空间和资源。

通信

进程之间的通信需要使用特定的机制,用一台计算机的进程通信称为IPC(Inter-process communication),不同计算机的之间通信需要通过网络并遵守相应的协议。

线程之间通信可以通过共享内存进行直接通信,也可以通过java提供的线程间通信机制(wait、notify等)实现通信

上下文切换

进程的上下文切换的开销要比线程的上下文切换的大很多,因此线程更加轻量。

开销

进程的创建和销毁开销相对较大,因为每个进程都要分配自己独立的内存空间和资源;

线程的创建和销毁开销相对较小,因为线程之间共享相同的内存空间和系统资源。

并行和并发

并发

早期单核CPU的运行环境下,线程感观是”同时执行”,但本质上是串行执行的,这是由于操作系统有一个组件叫任务调度器,任务调度器会轮流给每个任务分配执行时间,将cpu的时间片分给不同的线程使用使之交替执行,这些时间片的时间非常短,感观是”同时执行”,本质上是分时执行,简而言之就是:微观串行,宏观并行。

并发 - concurrency:同一时间间隔内交替执行多个任务,这些任务并不是同时进行的,而是在同一个处理多元单元通过快速的切换来实现。

image-20240110140123570

并行

并行 - parallelism:在同一时刻发生多个任务或操作,在多个处理单元上同时执行。

image-20240110140156661

引用rob pike的一段描述:

  • 并发是同一时间应对多件事情,而并行同一时间做多件事情

异步和同步

同步 - asychronous:任务按照顺序依次执行,一个任务的执行需要等待上一个任务的完成

异步 - synchronous:任务不按照顺序依次执行,任务的完成并不影响接下来的任务立即执行

为什么引入多线程

在多核处理器流行的当下,多线程可以有效的提升程序的运行效率,多核cpu并行执行多个线程,充分地利用硬件资源。

注意在单核cpu的机器上,多线程不能实际提高程序的运行效率,反而可能会影响执行效率,因为线程的上下文切换会带来一些性能的损耗。但是单核cpu的多线程,让单核机器有了宏观上应对多个任务的能力。

线程基础

线程的创建

创建方式

继承Thread类

创建Thread对象,重写 run() 方法,然后通过Thread调用 start() 方法来启动线程

1
2
3
4
5
6
7
8
9
// 创建线程对象 - 匿名内部类 实质上的创建的是线程的子类
Thread myThread = new Thread("myThread") {
@Override
public void run() {
// 覆盖 run 方法,方法里是执行的任务
}
};
// 启动线程 - 线程放入就绪队列中等待任务调度器分配时间片执行,调度时会执行线程中的run方法
myThread.start();

另一种非匿名内部类写法

1
2
3
4
5
6
7
8
9
10
11
12
public class MyThread extends Thread {
@Override
public void run() {
// ...
}
}
public class Test {
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
}
}
实现Runnable接口

实现 runnable 接口需要实现其中的 run() 方法,然后通过Thread对象调用 start() 方法来启动线程

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable() {
@Override
public void run() {
// ...
}
};
Thread myThread = new Thread(myRunnable, "myThread");
myThread.start();
}

另一种非匿名内部类写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyRunnable implements Runnable {
@Override
public void run() {
// ...
}
}

public class Test {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
Thread myThread = new Thread(myRunnable, "myThread");
myThread.start();
}
}
实现Callable接口

与 Runnable 相比,Callable 可以有返回值,返回值使用 futureTask 进行封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> stringCallable = new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello world!";
}
};
// 构造 FutureTask 对象,传入 Callable 对象参数
FutureTask<String> futureTask = new FutureTask<>(stringCallable);
Thread myThread = new Thread(futureTask, "myThread");
myThread.start();
// 阻塞等待返回值
System.out.println(futureTask.get());
}

Runnable和Thread源码分析

Thread类实现了 Runnable 接口, 重写了 run() 方法,

实现Runnable接口方法:当调用构造函数创建 Thread 对象,传入参数 Runnable 对象,该 Runnable 对象会赋值到 Thread 对象 的 Runnable target 的属性中,当调用 Thread 对象 的 start() 方法时,会执行 target.run()

继承Thread类方法:当调用构造函数创建 Thread 对象,未传入参数 Runnable 对象,创建的 Thread 对象重写 run() 方法,当调用 Thread 对象 的 start() 方法时,会调用 Thread 对象重写的 run() 方法

可以简单把 Thread 理解为 线程对象,Runnable 为 任务对象

1
2
3
4
5
6
# Runnable
@FunctionalInterface
public interface Runnable {

public abstract void run();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Thread
public class Thread implements Runnable {

private Runnable target;

public Thread(Runnable target) {
// ...
this.target = target;
}

@Override
public void run() {
if (target != null) {
target.run();
}
}
}

建议使用实现Runnable接口的方式创建线程:

  • 将线程对象和任务对象分开,使得任务对象可以更好的组合以及与一些线程池等高级API配合
  • 任务对象可执行即可,而线程对象整个类的创建的开销比较大
  • java不支持多继承,但是支持多接口实现,因此使用实现Runnable接口的方式,可以更好的扩展

jvm 线程原理

jvm 运行时数据区由 堆、方法区、虚拟机栈、本地方法栈和程序计数器组成,栈内存其实就是对应线程所持有的,每个线程启动后,都会为其分配一个对应的栈内存,栈内存是线程私有的,相互独立,互不干扰。每个栈由多个栈帧组成,每个栈帧对应了一次方法调用,每个线程同一时刻只有一个活动栈帧,对应着正在执行的方法。

image-20240110165743707

线程上下文切换

线程的上下文切换是指CPU从一个线程切换到另一个线程执行的过程,具体而言就是指多线程环境中,操作系统将当前运行的线程上下文保存起来,然后切换到另一个线程的上下文并开始执行。在 jvm 中由程序计数器来记录当前线程下一条字节码指令的地址。

  • 该线程的cpu时间片用完
  • 线程优先级调度:线程优先级的提高可能导致线程上下文切换。
  • 等待阻塞:当一个线程因为等待某个事件(如I/O操作、锁、信号量等)而被阻塞

注意:频繁的线程上下文切换会带来性能上的开销

线程常见的方法

start和run方法

  • start()

start 方法用于启动一个新的线程,并在新的线程中调用 run 方法。

start 方法会创建一个新的线程并使之进入就绪状态,等待操作系统分配CPU时间片这个线程执行 run 方法。

注意:如果对同一个线程对象多次调用 start 方法,会导致 IllegalThreadStateException 异常。

  • run()

run 方法是 Thread 类中定义的一个普通方法,它包含了线程执行的代码。

当直接调用 run 方法时,它会在当前线程中执行,不会创建新的线程,即 run 方法被视为普通方法被调用,不会启动新的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  public static void main(String[] args) {
Thread t1 = new Thread("t1") {
@Override
public void run() {
System.out.println("current thread 1:" + currentThread());
}
};
t1.start();
Thread t2 = new Thread("t2") {
@Override
public void run() {
System.out.println("current thread 2:" + currentThread());
}
};
t2.run();
}
执行结果:
current thread 1:Thread[t1,5,main]
current thread 2:Thread[main,5,main]

sleep和yield方法

  • sleep

sleep方法是Thread类的静态方法,调用Thread类的sleep方法会让当前线程休眠一段时间,此时线程的状态从 运行状态 切换到 TIMEWAITING-限期等待,线程释放占用的CPU资源并在该时间内不再竞争CPU执行权,让其他线程有机会执行。当休眠时间结束后,线程会重新进入就绪状态,等待分配CPU时间片即不会立即执行。

1
2
3
4
5
6
7
try {
// 参数为long millis, 表示休眠时间的毫秒数
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

注意:当一个线程处于睡眠状态(即在 sleep 方法中)时,如果另一个线程调用了被睡眠线程的 interrupt 方法,被睡眠线程会被唤醒,并且会抛出 InterruptedException,该线程会提前结束并进入抛出异常的状态。

建议:用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性

1
2
3
4
5
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

应用:使用while(true)方法连续执行任务时,引入适当的休眠或等待机制,可以防止cpu占用过度。因为该循环会一直执行,不会主动放弃时间片,这种情况被称为”忙等”(Busy Waiting)。注意:具体的休眠时间需要根据应用程序的需求来调整。太短的休眠时间可能导致频繁切换,而太长的休眠时间可能导致响应时间延迟。

1
2
3
4
5
6
7
8
9
10
while (true) {
// 执行一些任务

try {
// 等待一段时间,避免过度占用CPU
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
  • yield

yield 方法是 Thread 类的实例方法,用于提示调度器当前线程愿意放弃当前对CPU的使用,让其他线程有机会执行。它会让线程从 运行状态 进入 RUNNABLE 就绪状态(不是进入阻塞状态),等待重新分配CPU时间片。

注意:yield 方法并不保证当前线程会让步,只是向调度器发出一个提示,,其他线程是否能获得执行机会取决于底层操作系统的调度策略。

yield 通常用于在多个线程执行相同优先级任务,协调线程执行顺序,提高程序的执行效率。

join方法

join 方法时是 Thread 类提供的一个方法,用于当前线程等待调用该方法的线程执行完毕。

调用目标线程的join()方法,会让当前线程阻塞(放弃占用cpu时间片),直到目标线程结束。

它是一种线程同步机制,用于协调多个线程的执行顺序。

当调用 join(long millis) 方法表示等待目标线程执行结束,但最多等待millis毫秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread("t1") {
@Override
public void run() {
System.out.println("t1 start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 finish");
}
};

t1.start();
// 当前线程main阻塞等待t1执行结束
t1.join();
System.out.println("main stop");
}
# 输出结果:
t1 start
t1 finish
main stop

interrupt方法

thread.interrupt()方法用于中断一个线程的执行。调用目标线程的 interrupt方法 不会立即中断线程,而是设置线程的中断状态,线程自己检查中断标志并采取相应的行为。

使用 Thread.interrupted() 静态方法来检查当前线程的中断状态并清理中断状态。这个方法会返回当前中断状态,并在调用后将中断状态重新设置为 false。thread.isInterrupted 也可以检查当前线程的中断状态但是不会对中断状态进行清除。

  • 打断RUNNABLE的线程

    调用线程的 interrupt() 方法时,将线程的中断状态设置为 true

    应用:实现通知线程停止执行,线程可以在适当检查中断状态,如果发现被中断,可以选择优雅的退出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
    while (true) {
    System.out.println("t1 is running");
    // 目标线程自己检查中断标记,自己优雅的退出
    if (Thread.currentThread().isInterrupted()) {
    // 优雅的退出:执行退出前的工作
    System.out.println("t1 is break");
    break;
    }
    }
    },"t1");

    t1.start();
    // 运行一秒 打断标记
    TimeUnit.SECONDS.sleep(1);
    t1.interrupt();
    }
  • 打断休眠或者阻塞的线程

    如果线程被某些阻塞操作(如 Object 类的 wait()Thread.sleep()join() 方法等)所阻塞,调用 interrupt() 方法可以使线程抛出 InterruptedException,从而提前结束阻塞状态。

    注意这些情况的 isInterrupted() 会返回 false

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
      public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(()->{
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }, "t1");
    t1.start();
    Thread.sleep(1);
    t1.interrupt();
    System.out.println("打断标记:" + t1.isInterrupted());
    }
    # 输出
    Exception in thread "t1" java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
    at com.example.demo.concurrency.BlockingInterruptDemo.lambda$main$0(BlockingInterruptDemo.java:16)
    at java.lang.Thread.run(Thread.java:750)
    Caused by: java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.example.demo.concurrency.BlockingInterruptDemo.lambda$main$0(BlockingInterruptDemo.java:14)
    ... 1 more
    打断标记:false
  • 打断LockSupport.park的线程

    LockSupport.park 是 阻塞线程的一种方式,用于实现线程之间的协作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
       public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
    System.out.println("park...");
    LockSupport.park();
    System.out.println("unpark..thread isInterrupted - " + Thread.currentThread().isInterrupted());
    }, "t1");

    t1.start();
    TimeUnit.MILLISECONDS.sleep(100);
    t1.interrupt();
    }
    输出结果:
    park...
    unpark..thread isInterrupted - true

    在上述的例子中,子线程启动执行 LockSupport.park(), 主线程等待一段时间后调用 thread.interrupt() 打断子线程,此时线程的停止中断,此时的线程打断标记为 true

    注意:如果打断标记为 true, 此时线程如果在执行 LockSupport.park() 是无法阻塞的

线程优先级

线程的优先级是一个用于指定线程调度顺序的概念。java 中线程优先级是通过整数表示的,范围从Thread.MIN_PRIORITY(1)到Thread.MAX_PRIORITY(10),其中Thread.NORM_PRIORITY(5)是默认的优先级。

线程的优先级越高,任务调度器在选择下一个要执行的线程时,更有可能选择优先级较高的线程。注意:线程优先级并不是硬性规定,而是一个调度提示,在某些情况下影响线程的执行顺序,但不应该过度依赖它来实现程序的正确性。

主线程和守护线程

主线程(main thread)和 守护线程(daemon thread) 是 java 中线程的两种不同的类型。

默认情况下,整个 java 进程需要等待所有的线程运行结束才会结束。

而守护线程是一种在程序运行时在后台提供服务的线程,他的不影响程序的执行,也不阻止程序的终止。当所有的非守护线程结束时,进程会结束,守护线程也会被强制终止,即使它们还在执行任务。守护线程通常用于提供一些后台服务或周期性任务,如垃圾回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (true) {
System.out.println("开始运行...");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "t1");
// 将 t1 线程设置为 守护线程
t1.setDaemon(true);
t1.start();

TimeUnit.MILLISECONDS.sleep(100);
System.out.println("主线程执行结束!");
}
# 输出结构
开始运行...
主线程执行结束!

线程的状态

操作系统层面

image-20240112162416523
  • 初始 状态:仅仅在语言层面上创建了对象

  • 可运行 状态:线程已经创建完毕,等待cpu调度执行

  • 运行 状态:线程占用cpu资源,正在执行其指令

    可运行状态和运行状态可以相互转换,这是由由操作系统的任务调度器(Scheduler)决定,当任务调度器给线程获取cpu时间片,线程切换为运行状态执行并消耗cpu时间片的时间,当分配的cpu时间片用完了,则线程切换为可运行状态

  • 阻塞 状态:线程进入阻塞状态表示该线程暂时无法执行。这可能是因为线程在等待某个资源(如锁)的释放,或者在等待某个条件的满足。在阻塞状态下,线程不会占用 CPU 时间片,任务调度器不会考虑调度该线程。当阻塞状态结束后,线程会进入可运行状态

  • 终止状态:线程的指令执行完毕,线程的整个生命周期结束

JDK 层面

image-20240112162226284

java 将线程的状态划分为 6 种

1
2
3
4
5
6
7
8
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
image
  • 新建 - NEW:线程创建后尚未启动 即 线程创建,但是还没有调用 start 方法

  • 可运行状态 - RUNNABLE:可能是操作系统层面的运行状态,也可能是可运行状态,也可能是IO阻塞等导致的阻塞状态

  • 阻塞状态 - BLOCKING:线程等待获取一个排它锁,如果其线程释放了锁就会结束此状态

  • 无限期等待 - WAITING:等待其它线程显式地唤醒,否则不会被分配 CPU 时间片

    无限期等待进入方法 退出方法
    没有设置时间的 Object.wait() Object.notify() / Object.notifyAll()
    没有设置 Timeout 参数的 Thread.join() 目标线程执行完毕
    LockSupport.park() 其他线程调用 LockSupport.unpark(目标thread) 或者 调用 目标thread.interrupt()
  • 限期等待 - TIME WAITING:无需等待其它线程显式地唤醒,在一定时间之后会被自动唤醒

    限期等待进入方法 退出方法
    Thread.sleep() 时间结束
    设置时间的 Object.wait() 时间结束 / Object.notify() / Object.notifyAll()
    设置超时时间 Timeout 参数的 Thread.join() 时间结束 / 被调用的线程执行完毕
    LockSupport.parkNanos(long nanos) / LockSupport.parkUntil(long deadline) 其他线程调用 LockSupport.unpark(目标thread) 或者 调用目标thread.interrupt()
  • 死亡 - TERMINATED:线程结束任务之后自己结束,或者产生异常而结束

查看进程和线程的方式

查看进程

ps -ef # 查看所有进程

ps -ef | grep java 或者 jps -l # 查看java进程

top # 显示进程动态列表

kill # 查看进程

查看线程

ps -fT -p # 查看某个进程的所有线程

top -H -p # 进程中的动态线程列表

jstack # 进程中的线程快照详细信息

java 图形化线程监控工具 - jconsole

在控制台输入 jconsole 命令

image-20240110161406048

在jconsole进行远程监控时,需要jar包启动时,添加部分配置

1
2
3
4
5
6
java -Djava.rmi.server.hostname={host_ip} \
-Dcom.sun.management.jmxremote \ # 启用JMX远程访问
-Dcom.sun.management.jmxremote.port={port_number} \ # 指定了JMX远程连接的端口(与接口访问接口不同)
-Dcom.sun.management.jmxremote.ssl=false \ # 禁用SSL和身份验证
-Dcom.sun.management.jmxremote.authenticate=false \
-jar xxx.jar

线程应用

  • 异步调用
  • 并行计算
  • 同步等待
  • 协同规划

并发基础

共享模型

共享问题

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SimpleDemo {

public static int cnt = 0;

static void increment() {
// 临界区
cnt++;
}

static void decrement() {
// 临界区
cnt--;
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5000; i++) {
increment();
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5000; i++) {
decrement();
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(cnt);
}
}
# 输出结果
1616

上述代码中,对于代码中,使用了多线程对于一个静态变量(共享变量)分别执行了5000次自增和5000次自减操作,但是最后的结果并不是0,这是由多线程的共享问题导致的线程不安全问题。

从字节码的角度分析 静态变量的 cnt++ 操作

1
2
3
4
5
0 getstatic # 获取静态变量cnt的值
3 iconst_1 # 将整数常量值 1 推送到堆栈上
4 iadd # 将堆栈上的两个值相加
5 putstatic # 将相加的结果存回静态变量 cnt
8 return

由于 java 的 cpu 的分时复用技术,上述指令出现了 交错执行 的情况

image-20240113123606339

程序使用多线程执行本身没有问题,但是问题出现在对于共享资源的多线程的读写操作上,这块代码被称做 临界区

当多个线程试图同时访问和修改共享数据,最终结果依赖于线程执行的具体顺序,这种情况被称之为 竞态条件

解决方案

为了避免在临界区发生竞态条件,java 提供了很多方案

  • 阻塞式:synchronized lock
  • 非阻塞式:atomic原子类

变量线程安全分析

成员变量和静态变量如果没有共享,则线程安全;如果处于共享状态(不同的线程都可以访问和操作),但只有读取操作,线程也是安全的,当有读写操作时,则需要考线程安全问题。

synchronized

synchronzed (对象锁)是 java 中用于实现同步的关键字。它提供了一种机制,确保在多个线程访问共享资源时,只有一个线程能够进入关键代码段(临界区),从而避免竞态条件和数据不一致性的问题。

它采用 互斥 的方式让同一时刻只有一个线程可以持有对象锁,其他线程会被阻塞,因此可以保证获取对象锁的线程安全的执行临界区的代码,在此期间不会发生线程的上下文切换。

当锁被释放后,等待该锁的线程会被唤醒,被唤醒的线程会从阻塞状态进入就绪状态,等待操作系统的调度器重新分配 CPU 时间片,从而执行。

流程
  • 线程A获得对象锁并进入临界区执行
  • 线程B尝试访问相同的对象锁,线程B被阻塞,从可运行状态进入阻塞状态
  • 线程A释放锁时,线程B会被唤醒,并进入就绪状态
  • 任务调度器将CPU时间分配给线程B,线程B开始执行临界区
语法
1
2
3
4
5
6
7
8
9
10
11
12
// 使用在 代码块 上
synchronized (对象) { // 该对象可以是任何对象(可以是 类的 Class 对象),用于作为锁
临界区
}
// 使用在 方法 上 相当于 synchronized(this) {}
public synchronized void method() { // 整个方法体都会受到同步保护,对于非静态方法,锁是 当前对象实例

}
// 使用在 静态方法 上相当于 synchronized(xx.Class) {}
public static synchronized void staticMethod() { // 对于静态方法,锁是当前类的 Class 对象。

}
分析

synchronized 实际上是使用 对象锁 保证了临界区代码的原子性。因此,需要保护共享资源,必须对于同一个对象加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class SimpleDemo {

public static void main(String[] args) throws InterruptedException {

Counter counter = new Counter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter.decrement();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(counter.getCnt());
}
}

class Counter {
private static int cnt = 0;

public void increment() {
synchronized (this) {
cnt++;
}
}

public void decrement() {
synchronized (this) {
cnt--;
}
}

public int getCnt() {
return cnt;
}
}

关于 两个线程 执行是并行还是互斥,关键是看 synchronized 锁的是不是同一个对象。

synchronized 关键字的互斥性质是由同步锁的对象来决定的。不同的锁对象可能导致并行执行,相同的锁对象则会引起互斥执行。

原理

因为 synchronized被称为 对象锁,所以需要理解 synchronized 的底层,首先要了解 java 对象

java 对象在内存中的结构由三个部分组成:对象头(object header)、实例数据(instance data)和填充(Padding)。

  • 对象头:包含一些用于管理对象的元信息,如标记字、类型指针等
  • 实例数据:数据的实际数据,包括字段和方法等
  • 填充:为满足特定的内存对齐要求,可能在实例数据之后添加一些填充字节
对象头 - object header

对象头位于 每个对象的开头部分,它包含了用于管理对象的元信息。它包括了标记字、类型指针以及数组长度(如果对象是数组类型则有)。

  • 标记字:一般占用32个位即4字节
  • 类型指针:指向对象所属的元数据,用于确定对象的类型
image-20240113214427176
Monitor

其中,每个对象都有一个与之关联的监视器 - Monitor,它是多线程实现同步的机制的关键,负责管理对象的同步操作,监视器和对象是一对一的关系。当有对象被synchronized锁住,该对象头的 Mark Word 中就被设置为指向 Monitor 对象的指针。

monitor的结构:

  • 锁持有者 - owner:持有锁的线程

  • 等待锁 - waitset:调用对象 wait 方法进入的对象等待集

  • 通知队列 - entrylist:存放因为等待锁释放而被阻塞的线程的队列

    image-20240114130236719

monitor底层原理

当线程1成功获取到锁,此时锁住的对象的对象头的mark word前30位用于存储monitor的指针 - ptr_to_heavyweight_monitor,monitor 的 owner 指向线程1,线程1成为锁的拥有者(owner),并且对象头的加锁状态从01变为10(即正常状态到加锁状态),;在线程1执行临界区指令的过程中,此时线程2尝试获取锁,由于对象头中的状态为加锁状态并且关联的monitor的owner指向线程1,此时 线程2加入到 entrylist 等待队列 中,并且状态2的状态变为 阻塞状态 - BLOCKED;当线程1执行完临界区的指令后,此时 owner 置为空,并且由线程1调用 Object.notifyAll() 唤醒等待队列中的线程,这些线程在竞争(非公平的)尝试获取锁,执行临界区的指令

image-20240114140247979

分析字节码

1
2
3
4
5
6
7
8
9
10
11
12
public class SimpleDemo {

static final Object obj = new Object();

static int cnt = 0;

public static void main(String[] args) {
synchronized (obj) {
cnt++;
}
}
}

字节码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 0 getstatic #2 # 从索引为2的静态字段获取值并推送到操作数栈中 (这里索引为2的静态变量的值即是obj锁对象的引用)
3 dup # 复制栈顶的obj锁对象的引用并放入到栈顶
4 astore_1 # 将栈顶的obj锁对象的引用弹出存储到局部变量表位置1 用于后续的解锁和确保异常处理可以正确的释放锁
5 monitorenter # 进入对象的监视器,即获取锁。获取锁成功则 对象头的mark word存储指向monito的指针来与一个monitor对象关联
6 getstatic #3 # 从索引为3的静态字段获取值 cnt 并推送到
9 iconst_1 # 将整数1推送到栈顶
10 iadd # 执行整数相加操作
11 putstatic #3 # 将自增的结构存入到索引为3的静态字段cnt中
14 aload_1 # 加载局部变量表位置1的即之前复制的object对象
15 monitorexit # 退出对象的监视器,即释放锁
16 goto 24 (+8) # 正常执行完毕 跳转到字节码指令索引为24的位置
19 astore_2 # 将栈顶的引用类型值存储到局部变量表位置2,此时栈顶的引用为异常的引用
20 aload_1 # 将局部变量表中索引为1的obj锁对象引用加载到栈顶
21 monitorexit # 退出对象的监视器,即释放锁
22 aload_2 # 将在局部变量表中的位置2的信息即异常信息引用
23 athrow # 抛出异常
24 return # 从当前方法返回

从上述代码,可以看出,即使执行同步代码块时,发生了异常,锁也在抛出异常前,进行释放。

复制锁对象的引用存放到 局部变量表的位置1 的目的是 确保锁对象的引用在同步块内的后续操作中仍然位于固定位置,以便于后续的存储和使用

优化

jvm 底层对于 synchronized 进行了优化,在不同竞争程度下,采用了不同的锁的类型来优化性能开销,其中包括了 偏向锁、轻量级锁、重量级锁。这样做的目的在于提高获取锁和释放锁的效率以提高多线程并发操作的性能。

这四种状态是随着竞争情况而升级(即锁的膨胀)的,锁可以升级但是不可以降级(即锁的膨胀是不可逆的),锁的膨胀方向:偏向锁 -> 轻量级锁 -> 重量级锁。

这种优化策略依赖于具体应用场景和线程行为,jvm在运行时会根据运行情况动态的自动选择合适的锁类型,因此无需开发者显式低关心锁的细节,jvm 会在背后自动的进行优化,使用最适合的锁策略。

轻量级锁
  • 适用场景:多线程交替访问同步代码块(这里的交替指的是”访问的时间是错开的”,没有产生竞争)

  • 流程:

    • 当线程尝试获取锁时,首先会在线程的栈帧中,创建一个锁记录的对象(锁记录在JVM层面),锁记录对象包含两部分,一部分用来存储 将要锁住对象的对象指针的空间,另一部分存储 锁记录地址以及轻量级锁标识00

      image-20240116143157172

      image-20240116095137152

    • 当线程尝试获取锁时,会尝试通过CAS操作将对象的对象头的Mark Word和锁记录中的地址和轻量级锁标识进行互换

      • 如果CAS操作成功,则线程栈帧中的锁记录中的对象指针指向锁对象,标识线程获取锁成功

        image-20240116090743477

      • 如果CAS操作失败,会有两种情况

        • 如果锁对象的对象头中Mark Word中的锁标识为00,但锁记录指针指向当前线程的其他锁记录,则发生了锁重入的情况,则只在线程的栈帧加上一个锁记录用于锁重入的计数,锁记录取值为 Null

          image-20240116093852460

        • 如果锁对象的对象头中Mark Word中的锁标识为01,并且锁记录指针指向其他线程的锁记录,代表其他线程持有了这把对象锁,此时发生同一时刻有多个线程竞争同一个对象锁的情况发生了锁竞争,轻量级锁会升级成重量级锁,进入重量级的流程

    • 执行完同步代码块的内容后,开始释放锁时,

      • 当栈帧弹出锁记录为Null时,则代表这是重入锁的锁记录,则直接弹出,此时重入次数也会减一;

      • 当锁记录不为Null时,则通过CAS操作将对象的对象头的Mark Word和锁记录中存储之前的无锁状态的对象头信息(hashcode gc age 偏向锁标识 锁状态)进行互换即将原先对象头Mark Word进行恢复。

        • CAS恢复成功,则解锁成功
        • CAS恢复失败,则说明轻量级锁已经发生锁膨胀升级为了重量级锁,此时将进入重量级锁解锁的流程
锁膨胀

锁膨胀:在上述情况下,在线程尝试获取轻量级锁的过程中,CAS替换对象锁时,因为当前对象状态为01并且锁记录指针指向其他线程的锁记录时,此时表示 轻量级锁发生了竞争,这时候就需要锁膨胀,将轻量级锁升级为重量级锁。

image-20240116100637988

当有线程持有了轻量级锁,其他线程获取锁失败,进入锁膨胀流程:

  • 为锁对象申请Monitor监视器,让对象的对象头的Mark Word的锁状态变为10,指针指向Monitor监视器,Monitor监视器的owner指向当前持有轻量级锁的线程,其他线程进入Monitor的阻塞队列中,并且变为阻塞状态

  • 当之前持有轻量级锁的线程执行完同步代码块后尝试释放锁,通过CAS将之前存储在线程栈帧锁记录中的对象的Mark Word交换恢复给对象,此时由于对象对象头中的Mark Word存储的是Monitor地址和10重量级锁标识,因此进入重量级锁的锁释放流程,根据对象头的Monitor的指针找到Monitor监视器,将Owner置为Null,并且使用之前的持有锁线程唤醒entrylist的阻塞线程

    image-20240116103407802

自旋优化

自旋优化 - spin lock

在线程尝试获取锁,如果锁已经被其他线程占用,该线程不会立即阻塞,而是会循环自旋一段时间,防止线程阻塞而减少上下文切换的开销,这针对于短暂的锁占用和低竞争情况的一种优化策略。

注意:自旋会占用一定的CPU资源,节省了上下文切换的时间,这也是为什么它用于短暂的锁占用和低竞争情况的原因。此外,它会进行一定时间或次数的自旋(JDK1.6引入了自适应自旋锁Adaptive Spinning - 自旋的时间由上一次在同一个锁上的自旋时间及锁的拥有者的状态来决定),如果自旋超过了限定的时间或次数,则会进入阻塞状态,等待被唤醒。

偏向锁
  • 使用场景:同一个代码块由同一个线程多次执行的场景;执行的同步代码块大多数场景下是单线程访问同步块的场景,使用偏向锁可以减少CAS操作和monitor对象的开销

  • 机制:当线程第一次进入获取对象锁时,线程获取锁成功,将该线程的ID直接存储在对象头中并将标记对象为启用偏向锁状态(即表示该对象归属于该线程),之后该线程重复获取锁,无需竞争,直接获取锁,后续锁释放,依旧会在对象头中保持这个线程ID

    image-20240116142559410

    对象默认开启了偏向锁即创建的对象最后3位为101(添加参数 -XX:-UseBiasedLocking 禁用偏向锁),但是偏向锁是默认延迟的,对象创建后不是立即处于启用偏向状态,而是有一个默认的延迟时间,延迟时间后启用偏向状态即最后3位为101,这个延迟时间的目的是为了让应用程序在启动时有足够的时间执行一些初始化操作,避免在初始化阶段执行一些初始化操作。(可以添加 VM 参数:-XX:BiasedLockingStartupDelay=0来禁用延迟)

  • 偏向锁的撤销:撤销对于对象的偏向 将101状态修改为001

    • 调用对象的 hashcode 方法(对象头的hashcode占用对象头mark word的一部分空间)
    • 当出现其他线程尝试获取偏向锁对象,会将偏向锁升级为轻量级锁
    • 调用对象的 wait/notify 方法
  • 批量重偏向:当对象的偏向锁的撤销但并多个线程并没有没有出现竞争时,而其中一个线程获取锁的次数达到了阈值(默认20),虚拟机可能会重新偏向其中的一个线程。但是如果该类的实例对象偏向锁撤销的次数越来越多,达到阈值(默认40),会将整个类标记为不可偏向,新建的该类的实例对象也无法偏向。

锁消除

锁消除机制是jvm即时编译器的优化手段,用于消除代码中不必要的同步操作,从而减少加锁和释放锁带来的性能开销。编译器通过静态分析代码,判断对象锁在特定情况下不会发生竞争。

锁消除的常见情况

  • 逃逸分析:分析对象的引用是否会逃逸出当前方法的作用域,如果没有,则说明对象的生命周期仅限于当前方法的执行过程,不会被其他方法引用。即时编译器判断在方法内部,对象锁不会发生竞争,此时可以消除锁操作
  • 线程局部存储:对象的作用域为所属线程,此时也不会发生线程锁的竞争,因此可以安全的消除锁操作

jvm 默认开启锁消除机制

wait和notify方法

wait 是 Object 类的实例方法用于 线程之间的协作:线程为了等待某个条件满足,阻塞等待,让出锁其他的线程获取 锁执行

该方法只能在同步代码块或同步方法中调用,否则会运行时抛出 IllegalMonitorStateException。

调用 wait() 方法使得线程挂起,等待某个条件满足,此时线程会被挂起,这个期间会释放对象锁,当其他线程的运行使得这个条件满足时,其他线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程

image-20240116215737327

当线程获取锁后进入同步代码块中后,调用wait方法,释放对象的锁,线程从RUNNABLE状态变为WAITING状态(不会占用CPU资源),并加入到monitor对象的WaitSet等待集合中,其他线程调用 notify() 或者 notifyAll() 来唤醒WaitSet中的线程,被唤醒的线程重新尝试获取锁。

相关API方法
  • obj.wait():让当前持有对象锁的线程到monitor监视器的WaitSet中等待,无限期等待直到其他线程获取该对象锁并调用notify()或者notifyAll()唤醒它
  • obj.wait(long timeout):同样是线程进入到monitor的WaitSet中等待,只不过是限期等待,等待超过时间会自动唤醒
  • obj.notify():在同步代码中唤醒对象monitor监视器的WaitSet中的一个线程
  • obj.notifyAll():在同步代码中唤醒对象monitor监视器的WaitSet中的全部线程

notify和notifyAll方法必须在同步代码块中执行,是为了保证线程安全性、避免数据竞争和死锁,并确保线程间正确的通信

这些方法主要用于线程之间的协作,都是所有Object对象方法,所有方法的调用只能在获取锁的前提下即在同步代码块或者同步方法中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SimpleDemo {

// 设置final修饰变量 保证对象引用不可变
private final static Object obj = new Object();

public static void main(String[] args) throws InterruptedException {

new Thread(() -> {
synchronized (obj) {
System.out.println("t1 start");
try {
obj.wait(); // t1 进入 WaitSet
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 end");
}
}, "t1").start();

new Thread(() -> {
synchronized (obj) {
System.out.println("t2 start");
try {
obj.wait(); // t2 进入 WaitSet
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t2 end");
}
}, "t2").start();

TimeUnit.MILLISECONDS.sleep(2000);

synchronized (obj) {
// notify 唤醒WaitSet其中一个线程 notifyAll 唤醒WaitSet所有线程
obj.notify();
}
}
}
# 输出
t1 start
t2 start
t1 end

wait和sleep的区别
  • sleep 是 Thread 类的一个静态方法,wait 是 Object 类的方法
  • wait 必须在同步代码块中调用即调用的该方法的线程必须要持有这把对象锁,而 sleep 则不需要
  • sleep 休眠时无论是否持有锁都不会释放持有的锁,而 wait 会在等待时会释放对象锁
  • wait 没有设置休眠时间时,会进入无限期等待,直到其他线程获取锁后,调用notify()或者notifyAll()方法唤醒锁
  • sleep 方法用于 定时任务或者简单的时间控制,而 wait 方法用于线程之间的协作
正确实例

在线程需要等待某个条件的场景下,可以使用wait方法来让线程阻塞,释放锁对象,让其他线程先执行,等待条件满足后,再由其他线程获取锁对象调用notifyAll 唤醒线程。

  • 这种方式,相比于 sleep 方法而言,效率更好,因为 sleep 在执行同步代码块时,不会释放锁对象。
  • 使用 notifyAll 的原因是 notify方法只会从对象的monitor监视器的 WaitSet 中挑一个执行,当有个多个线程执行wait方法时即 WaitSet 中有多个对象时,notify 不一定会唤醒该线程, 而notifyAll 会唤醒 WaitSet中的所有集合。
  • 线程被notifyAll唤醒后,此时的线程所需要等待的条件可能没有成立,此时,线程直接往下执行,不会执行某个条件成立的代码。因此需要使用 while 让没有成立条件的方法继续进入等待状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final Object lock = new Object(); // 使用 final 修饰的变量 不可变(引用关系)

new Thread(() -> {
synchronized(lock) {
while(条件不成立) {
lock.wait();
}
}
}).start();

new Thread(() -> {
synchronized(lock) {
lock.notifyAll();
}
}).start();
保护性暂停

保护性暂停 - Guarded Suspension,用于 一个线程等待另一个线程的执行结果。

其核心是一个受保护的方法,该方法在执行其所需要真正执行的操作时需要满足特定的条件,当条件状态不满足时,执行受保护方法的线程挂起并进入等待状态,直到条件状态满足该线程才能继续执行。

应用场景:JDK中,join底层实现,Future类的底层实现。

image-20240118140420491
简单代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class GuardObject {

private Object result;

private final Object lock = new Object();

public Object get() {
synchronized (lock) {
while (result == null) {
try {
// 条件不满足 阻塞 释放锁
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return result;
}
}

// 设置等待的超时时间
public Object get(long timeout) {
synchronized (lock) {
long startTime = System.currentTimeMillis();
while (result == null) {
long passTime = System.currentTimeMillis() - startTime;
System.out.println(passTime);
if (passTime > timeout) {
break;
}
try {
// 防止结果依旧为 null 但是被其他线程唤醒
lock.wait(timeout - passTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return result;
}
}

public synchronized void complete(Object response) {
synchronized (lock) {
// 其他线程获取条件
this.result = response;
// 唤醒
lock.notifyAll();
}
}
}

public class TestGuard {

public static void main(String[] args) {

GuardObject guardObject = new GuardObject();

new Thread(() -> {
// 设置超时时间
Object o = guardObject.get(1000);
System.out.println("result:" + o);
}).start();


new Thread(() -> {
System.out.println("result complete...");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
guardObject.complete(Integer.MAX_VALUE);
}).start();
}
}
join底层分析

线程A调用线程B实例的join方法,线程A将等待线程B执行结束后,才会执行线程A下面的代码。

join的底层实现就用到了 保护性暂停 Guarded Suspension 模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    public final void join() throws InterruptedException {
join(0);
}

// millis 超时时间 0表示不会超时
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

// 参数有效性检查
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
// 不会超时的等待
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
// 超时释放
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

/**
* Tests if this thread is alive. A thread is alive if it has
* been started and has not yet died.
*
* 代码执行完毕后,线程被销毁 返回 false
*/
public final native boolean isAlive();
多任务代码实现
image-20240118150227848
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# GuardObj 
public class GuardObj {

private Long id;

private Object result;

public GuardObj(Long id) {
this.id = id;
}

public synchronized Object get(long timeout) {
long startTime = System.currentTimeMillis();

while (result == null) {
long passTime = System.currentTimeMillis() - startTime;
if (passTime > timeout) {
break;
}
try {
this.wait(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

return result;
}

public synchronized void complete(Object result) {
this.result = result;
this.notify();
}
}

# Tasks 统一管理多个任务
public class Tasks {

private static final ConcurrentMap<Long, GuardObj> tasks = new ConcurrentHashMap<>();

public static GuardObj genGuardObj(Long id) {
GuardObj guardObj = new GuardObj(id);
tasks.put(id, guardObj);
return guardObj;
}

public static GuardObj removeGuardObj(Long id) {
return tasks.remove(id);
}
}

public class TestGuardObjs {

public static void main(String[] args) throws InterruptedException {

for (Long i = 0l; i < 3; i++) {
Long finalI = i;
new Thread(() -> {
GuardObj guardObj = Tasks.genGuardObj(finalI);
System.out.println("任务" + finalI + "开始接受任务");
guardObj.get(5000);
}).start();
}

TimeUnit.SECONDS.sleep(1);

for (Long i = 0l; i < 3; i++) {
Long finalI = i;
new Thread(() -> {
GuardObj guardObj = Tasks.removeGuardObj(finalI);
System.out.println("向任务" + finalI + "发送任务");
guardObj.complete(finalI);
}).start();
}
}
}
# 输出
任务0开始接受任务
任务1开始接受任务
任务2开始接受任务
向任务0发送任务
向任务1发送任务
向任务2发送任务
生产者和消费者

上述的模式,一个线程等待另一个线程的条件满足,才被唤醒执行,等待线程和唤醒线程是一对一的关系,在多个任务时,需要多对线程。而在生产者和消费者模型中,等待线程可以多次等待,唤醒线程可以多次唤醒,从而减少了线程资源的开销。生产者只负责生产数据,消费者只负责接收数据,数据以消费队列的方式存储,JDK的很多阻塞队列采用了这种模式。

image-20240118190002480
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Message {

private Integer id;

private String content;

public Message(Integer id, String content) {
this.id = id;
this.content = content;
}

public Integer getId() {
return id;
}

public String getContent() {
return content;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", content='" + content + '\'' +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class MessageQueue {

private LinkedList<Message> list = new LinkedList<>();

private final Integer capacity;

public MessageQueue(Integer capacity) {
this.capacity = capacity;
}

public Message take() {
synchronized (list) {
// 获取消息的等待条件为 队列不能空
while (list.isEmpty()) {
try {
System.out.println("消息队列已空,等待生产者生产消息");
list.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Message message = list.removeFirst();
System.out.println("comsume:" + message.toString());
list.notifyAll();
return message;
}
}

public void put(Message message) {
synchronized (list) {
// 等待存入消息的条件为 队列没有满
while (list.size() == capacity) {
try {
System.out.println("消息队列满足,等待消费者消费消息");
list.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

list.addLast(message);
System.out.println("produce:" + message.toString());
list.notifyAll();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestMq {

public static void main(String[] args) {

MessageQueue messageQueue = new MessageQueue(2);

for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
messageQueue.put(new Message(id, "content" + id));
}, "produce" + i).start();
}

new Thread(() -> {
while (true) {
messageQueue.take();
}
}).start();
}
}

Park和unPark方法

Park和unPark方法是LockSupport类中的方法,它们以线程为单位阻塞和唤醒线程,

1
2
LockSupport.park(); // 用于暂停当前线程
LockSupport.unpark(thread对象); // 恢复某个线程的运行
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws InterruptedException {

Thread t1 = new Thread(() -> {
System.out.println("start...");
System.out.println("park...");
LockSupport.park();
System.out.println("unpark...");
}, "t1");
t1.start();

TimeUnit.SECONDS.sleep(1);
LockSupport.unpark(t1);

}
# 输出:
start...
park...
unpark...

注意:unpark 方法可以在 park 之前调用,也可以在 park 之后调用,在之前调用也可以恢复线程。

wait和park方法
  • wait、notify和 notifyAll 底层通过 对象的 monitor 监视器实现,而 unpark 实现更加的轻量级,unpark 让线程阻塞,但是不会让持有锁的线程释放锁
  • unpark 是 唤醒一个指定的线程,而 notify/notifyAll 则是唤醒 对象的 monitor 监视器中 waitSet 等待线程的随机一个
  • park 之前可以先执行 unpark
原理

LockSupport中的 park 和 unpark 方法,底层原理依赖于操作系统的线程调度和管理机制。当一个线程调用 park 时,它会检查线程jvm底层的parker对象 是否有一个“许可”(permit)可用,如果有许可,park 立即返回,并消耗这个许可(即许可数变为0);如果没有许可,线程将被阻塞,直到接收到一个许可或者线程被中断。当线程一个线程调用 unpark 唤醒它时,它会如果目标线程此时被 park 阻塞,它将被唤醒。如果目标线程此时没有被阻塞,它会持有这个许可,直到下次调用 park 时立即返回而不会阻塞。

活跃性

死锁

在很多场景下,为了提高系统的并发度,会讲锁的粒度尽可能的细化,这样有可能造成死锁。死锁就是 两个线程或者多个线程互等待彼此持有的锁。

定位死锁
  • jstack
1
2
3
jps 
jstack <PID> > thread.dump
搜索关键字 deadlock
  • 使用 visualvm 或者 jconsole 工具

    image-20240118223540220
解决方案

可以按照相同的顺序进行加锁,但是这种情况容易发生饥饿现象即有部分线程很难获取到锁,导致一直没有办法执行

活锁

活锁是指两个或者多个线程相互改变彼此线程的结束条件,导致的状态变化彼此抵消,使得两个线程一直执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SimpleDemo {

static volatile int cnt = 10;

public static void main(String[] args) {

new Thread(() -> {
while (cnt > 0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
cnt--;

System.out.println(cnt);
}
}).start();
new Thread(() -> {
while (cnt < 20) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
cnt++;
System.out.println(cnt);
}
}).start();
}
}

上述代码,会一直执行下去,解决办法:可以使得两个线程执行时间交错,可以两个线程的休眠时间设置成不同的

饥饿

饥饿是指一个线程由于优先级太低,始终得不到CPU调度而始终无法结束的情况或者很难获取到锁对象,导致一直没有办法执行同步代码块

ReentrantLock

ReentrantLock 翻译是 可重入锁,相比于 synchronized 有如下的特点

  • 可以被中断
  • 可以设置获取锁的超时时间
  • 可以设置公平锁 - 先到先得,有效防止部分线程饥饿的情况
  • 支持多个条件变量进行

与 synchronized 一样,ReentrantLock 也是可重入的。

基本语法
1
2
3
4
5
6
7
8
9
10
    // 创建锁对象
ReentrantLock lock = new ReentrantLock();
// 获取锁
lock.lock();
try {
// 临界区
} finally {
// 释放锁
lock.unlock();
}
可重入

可重入指的是同一个线程可以多次获取同一把锁

可打断

Synchronized 和 ReentrantLock的 Lock 都是不可打断的,即有线程持有了锁,其他线程将一直阻塞等待下去。

可打断是指 锁已经被持有的情况下,线程等待锁的释放,等待锁的释放的过程可以被打断,防止线程一直等待获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 public static void main(String[] args) { 
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
try {
// 如果锁没有被其他线程占用,竞争获取对象的锁
// 如果锁被其他线程占用,则进入阻塞队列等待锁释放
System.out.println("t1 try lock");
lock.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println("interrupted");
e.printStackTrace();
return;
}
try {
System.out.println("t1 get lock");
} finally {
System.out.println("t1 unlock");
lock.unlock();
}
}, "t1");

lock.lock();
System.out.println("main lock");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 interrupt");
t1.interrupt();
}
# 输出
main lock
t1 try lock
t1 interrupt
interrupted
java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:944)
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1263)
at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:317)
at com.example.demo.concurrency.ReentrantLockDemo.lambda$method2$0(ReentrantLockDemo.java:38)
at java.base/java.lang.Thread.run(Thread.java:834)
锁超时

锁超时机制是指尝试获取锁的线程不会一直阻塞等待,而是在指定的时间内等待锁的释放。如果在指定的时间内未能获取到锁,则停止等待,返回 false。这种机制有效的防止了线程无限制的等待下去,防止了死锁。

方法
1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {         
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {

System.out.println("t1 try lock");
boolean tryLock;
try {
tryLock = lock.tryLock(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (!tryLock) {
System.out.println("获取锁超时");
return;
}
try {
System.out.println("t1 get lock");
} finally {
lock.unlock();
}
});

lock.lock();
t1.start();
}
# 输出:
t1 try lock
获取锁超时

注意当直接调用 lock.trylock() 方法时即没有设置超时时间的情况下,线程直接尝试获取锁,不会等待,获取不到则直接返回 false ,获取锁失败。

公平锁

公平锁的是指 线程会按照它们请求锁的顺序获得锁即先发出请求的线程会先获得锁,而后发出请求的线程按顺序排队等待锁。

Synchronized 和 ReentrantLock的无参数构造都是 非公平锁。 Synchronized 的 对象对应的 monitor 监视器的 EntryList(本质队列)维护了 等待获取锁的线程,当占有锁的线程释放锁时, 会调用 notifyAll(不是notify方法,防止死锁和饥饿) 方法唤醒所有阻塞队列的线程,让这些线程重新竞争获取锁。ReentrantLock 的无参数构造默认是 非公平锁。

ReentrantLock 的有参数构造设置 fair 参数为 true,此时是 公平锁 的实现。公平锁可以避免饥饿问题,防止线程无限制的等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* ReentrantLock 实例对象的无参构造方法,默认创建非公平锁
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* ReentrantLock 实例对象的有参构造方法,
* Creates an instance of {@code ReentrantLock} with the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

但是没有必要使用 ReentrantLock 实现避免饥饿问题(防止线程无限制的等待),因为 并发锁会降低并发度,造成性能降低,可以使用 lock.trylock 方法。

条件变量

条件变量是ReentrantLock提供的一种允许持有锁的线程在特定条件不满足时阻塞等待,在条件满足时被唤醒的特性。和Synchronized代码块中的wait/notify相比,ReentrantLock支持多个条件变量,提供了更细粒度的阻塞和唤醒机制。通过使用条件变量,你可以控制线程在特定条件下的等待和唤醒,从而有效地管理线程间的协作。

ReentrantLock的条件变量和Synchronized的wait/notify方法防止持有锁的线程在等待条件满足时占有锁,让其他线程无法执行,它们都有效的管理了线程之间的协作。

ReentrantLock的条件变量通过 ReentrantLock实例对象的 newCondition 方法创建,该提供了该条件变量的await方法使得当前持有锁的线程等待并且释放锁(当前线程需要持有锁),直到另一个线程调用同一条件变量的single或者sinleAll方法唤醒等待条件中的一个线程或者全部线程重新竞争lock锁。

注意:使用变量时,调用 await 方法,线程应持有相关的锁。当调用 await方法后,线程会释放锁,进入等待状态。

使用场景:条件变量通常用于表示“某个条件是否满足”,用于实现复杂的线程间协调和通信。例如,在生产者-消费者问题中,消费者线程可能在“缓冲区为空”的条件上等待,而生产者线程在添加了新项到缓冲区后会通知该条件。

实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

public static void main(String[] args) {

ReentrantLock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

new Thread(() -> {
lock.lock();
try {
System.out.println("t1 wait");
condition1.await();
System.out.println("t1 doing");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
lock.lock();
try {
System.out.println("t2 wait");
condition1.await();
System.out.println("t2 doing");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}).start();

try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

lock.lock();
try {
condition1.signalAll();
condition2.signalAll();
} finally {
lock.unlock();
}

}