多线程与高并发四之Java中断机制 前言 我们首先介绍中断的三个 APPI 及其底层代码,在对方法的实现有了清晰的认知后,再结合场景谈谈什么是中断,以及中断该如何正确使用?
一、中断方法 1. isInterrupted 1 2 3 4 public boolean isInterrupted() { // 调用isInterrupted 方法,中断标记设置为 true return isInterrupted(false); }
这个方法很简单,就是返回当前线程的中断标记值,这个方法是由 native 方法实现。
1 2 3 4 5 6 /** * Tests if some Thread has been interrupted. The interrupted state * is reset or not based on the value of ClearInterrupted that is * passed. */ private native boolean isInterrupted(boolean ClearInterrupted);
根据 Thread 类找到对应路径下 Thread.c,可以看到该方法的底层实现方法JVM_IsInterrupted: 在 hotspot 源码 jvm.c 文件中,可以看到 JVM_IsInterrupted 的底层实现依赖于操作系统的 is_interrupted 方法,我们就看 os_linux 的实现: 该方法中的逻辑也很简单,就是返回了操作系统的线程中断状态,如果清除标记为 true,那么就重置中断标记
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 bool os::is_interrupted(Thread* thread, bool clear_interrupted) { assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer"); // 拿到对应的操作系统线程 OSThread* osthread = thread->osthread(); // 获取该线程的中断状态 bool interrupted = osthread->interrupted(); // 如果中断状态为true 并且需要清除中断标记,那么将中断标记重置为 false if (interrupted && clear_interrupted) { osthread->set_interrupted(false); // consider thread->_SleepEvent->reset() ... optional optimization } // 返回中断标记 return interrupted; }
2. interrupted() 1 2 3 public static boolean interrupted() { return currentThread().isInterrupted(true); }
看到这个方法可以发现,它和 isinterrupted() 的实现都是调用 isInterrupted 方法,只是参数不一样,interrupted 的参数为 true,这个参数的含义就是是否要清除中断标记。因此该方法的作用是返回当前线程的中断标记并且重置中断标记。
3. interrupt() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void interrupt() { // 检查调用方线程是否有对被调用线程的修改权 if (this != Thread.currentThread()) checkAccess(); // 中断网络 IO synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } // 调用 native 方法 interrupt0(); }
先忽略中断网络 IO 这块的代码,下面会讲,可以看到这个方法最终依赖 native 方法 interrupt0(),关注其底层实现,可以发现该中断方法主要做了两件事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void os::interrupt(Thread* thread) { // 确保执行该方法的线程是当前线程 assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer"); OSThread* osthread = thread->osthread(); if (!osthread->interrupted()) { // 设置中断标记 osthread->set_interrupted(true); // More than one thread can get here with the same value of osthread, // resulting in multiple notifications. We do, however, want the store // to interrupted() to be visible to other threads before we execute unpark(). // 屏障指令,保证在执行 unpark() 之前对其他线程可见 OrderAccess::fence(); // 唤醒 sleep() 阻塞的线程 ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; } // 唤醒 park() 阻塞的线程 // For JSR166. Unpark even if interrupt status already was set 只有中断状态被设置了之后才能执行 unpark if (thread->is_Java_thread()) ((JavaThread*)thread)->parker()->unpark(); // 线程被wait()方法阻塞的线程 ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ; }
根据上述源码可以发现 Java 中的中断并没有中断任务的功能,所提供的仅仅是设置标记以及唤醒线程,对应的也就是下述两种应用场景,如要中断任务需要开发者根据打断标记自行编码打断任务,如要阻止线程继续阻塞,则唤醒线程。
注意该方法上的注释(中文翻译),在执行 interrupt() 方法时,其产生的结果因不同场景而不同,具体如下:
线程因调用 object.wait、Thread.join、Thread.sleep 方法阻塞,将会抛出InterruptedException,同时清除线程的中断状态;
线程阻塞在 java.nio.channels.InterruptibleChannel 的 IO 上,Channel 将会被关闭,线程被置为中断状态,并抛出 java.nio.channels.ClosedByInterruptException;
如果线程堵塞在 java.nio.channels.Selector 上,线程被置为中断状态,select方法会马上返回,类似调用wakeup的效果;
如果线程处于 not alive(线程刚被 new 出来没有运行,或者已经死亡) 的状态则毫无影响
二、什么是中断? 中断,顾名思义是对线程的当前状态的改变,使其恢复到原有状态,具体如何改变状态依据不同的情景有着不同的结果。在对 Java 的中断能不能真正的中断线程这个问题上,不少博客各执一词,最后我发现大家是在鸡同鸭讲。能否中断线程因线程状态而异,具体如下:
中断任务的执行 在某些时候,如生产者/消费者模式下,线程循环执行任务,如何让线程停下来? 在 Java 中没有提供停止线程的操作(stop()方法会导致错误,已被官方标记过时),Java 提供 interrupt() 方法设置中断标记,但是如何停止线程却是开发者自己决定的事情。 上面源码讲述已经提到 interrupt() 方法并没有提供打断线程的机制(如下图,线程不会停止运行),要实现运行中的线程中断,需要调用 interrupt() 修改线程的中断标记,然后需要在被调用线程中自己实现逻辑,通常的做法是在合适的位置(例如在while 循环处)不断检查此线程的中断标记是否被设置,在检测到中断标记为 true 的地方再停止线程(使用方式可搜索两阶段终止提交,此文不涉及应用)。
中断线程的阻塞状态 如下图是中断线程的阻塞状态,在线程检查到中断标记为true 的时候会在这些阻塞方法调用处抛出 InterruptedException , 并且清除中断标记。 并不是所有阻塞态的线程都能被中断,Synchronized 不支持锁中断,见下文。
三、其他 关于 Synchronized 不可中断 先看以下场景,T2 和 T1 竞争锁,在 T1 因获取锁阻塞的时候去打断,看结果如何。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 /** * description * * @author greysonchance * @since 2022/8/1 */ @Slf4j public class TestSynchronized { public static void main(String[] args) { Object lock = new Object(); Thread t1 = new Thread(() -> { synchronized (lock) { log.info("线程 T1 拿到锁, 开始执行"); while (!Thread.currentThread().isInterrupted()) { } } log.info("线程 T1 退出"); }, "T1"); Thread t2 = new Thread(() -> { synchronized (lock) { log.info("线程 T2 拿到锁, 开始执行"); t1.start(); try { // 保证 T1 线程已经开始获取锁进入阻塞态 Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("T2 输出线程 T1 的状态是:{}", t1.getState()); // 死循环不释放锁,让 T1 一直处于 Blocked 态 while (true) { } } }, "T2"); t2.start(); log.info("开始中断线程 T1"); t1.interrupt(); try { // 等待一会看状态是否被修改 Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("Main 输出线程 T1 的状态是:{}", t1.getState()); } }
如图,线程 T2 启动并且先获取到锁, 然后 t1.start() 启动 T1 线程,但是 T1 因获取不到锁阻塞,此时执行 interrupt 方法并不能中断 T1,线程仍然处于 Blocked 态。
这是因为线程 T1 因获取不到锁,阻塞在 Monitor 的队列中,interrupt() 方法并不能将该线程从队列中移出。
再看下面场景,线程获取锁之后调用了 wait() 方法,然后再调用 interrupt() 方法,线程被中断成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 /** * description * * @author greysonchance * @since 2022/8/1 */ @Slf4j public class TestSynchronized2 { public static void main(String[] args) throws InterruptedException { Object lock = new Object(); Thread t1 = new Thread(() -> { synchronized (lock) { log.info("线程 T1 拿到锁, 开始执行"); for (int i = 0; i < 3; i++) { System.out.println("执行任务"); } try { lock.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } log.info("线程 T1 退出"); }, "T1"); t1.start(); Thread.sleep(1000); t1.interrupt(); Thread.sleep(1000); log.info("线程 T1 状态:{}", t1.getState()); } }
综上,Synchronized 的不可中断指的是当线程因获取锁失败阻塞在队列中时(状态为 BLOCKED)不可被打断,如果已经获取了锁调用 sleep()\wait() 等方法而阻塞(状态为 WAITED或TIME_WAITING)是可以被打断的。
interrupt() 与 park() 对 unpark() 的影响 interrupt() 在唤醒被 unpark() 阻塞的线程时会修改中断标记为true,而 park() 唤醒不会。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Slf4j public class TestUnpark { public static void main(String[] args) { Thread t0 = new Thread(new Runnable() { @Override public void run() { Thread current = Thread.currentThread(); log.info("current Thread name:{}", current.getName()); log.info("准备park当前线程:{}", current.getName()); log.info("当前线程的中断标记:{}",current.isInterrupted()); LockSupport.park(); log.info("线程{}阻断后又运行了", current.getName()); log.info("当前线程被唤醒后的中断标记:{}",current.isInterrupted()); } }, "t0"); t0.start(); try { log.info("休眠…………"); Thread.sleep(2000); // log.info("调用LockSupport.unpark方法,唤醒线程{}", t0.getName()); // LockSupport.unpark(t0); log.info("调用interrupt方法,唤醒线程{}", t0.getName()); t0.interrupt(); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
interrupt() 的实验结果:
unpark() 的实验结果: