您的足迹:首页 > Java技术篇 >深度解析Java线程池的异常处理机制

深度解析Java线程池的异常处理机制


前言

今天小伙伴遇到个小问题,线程池提交的任务如果没有catch异常,那么会抛到哪里去,之前倒是没研究过,本着实事求是的原则,看了一下代码。

正文

小问题

考虑下面这段代码,有什么区别呢?你可以猜猜会不会有异常打出呢?如果打出来的话是在哪里?:

        ExecutorService threadPool = Executors.newFixedThreadPool(1);
        threadPool.submit(() -> { Object obj = null; System.out.println(obj.toString());
        });
        threadPool.execute(() -> { Object obj = null; System.out.println(obj.toString());
        });

源码解析

我们下面就来看下代码, 其实就是将我们提交过去的Runnable包装成一个Future

	public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask); return ftask;
    } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value);
    } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // volatile修饰,保证多线程下的可见性,可以看看Java内存模型 } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result);
    } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result;
        } public T call() {
            task.run(); return result;
        }
    }

接下来就会实际提交到队列中交给线程池调度处理:

	/**  * 代码还是很清爽的,一个很典型的生产者/消费者模型,  * 这里暂不纠结这些细节,那么如果提交到workQueue成功的话,消费者是谁呢?  * 明显在这个newWorker里搞的鬼,同样细节有兴趣可以自己再去研究,这里我们会发现  * 核心就是Worker这个内部类  */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return;
            c = ctl.get();
        } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))
                reject(command); else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } else if (!addWorker(command, false))
            reject(command);
    }

那么接下来看看线程池核心的流程:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** Delegates main run loop to outer runWorker  */ public void run() {
            runWorker(this);
        }
} final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //getTask()方法会尝试从队列中抓取数据 while (task != null || (task = getTask()) != null) {
                w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt(); try { //可覆写此方法打日志埋点之类的 beforeExecute(wt, task); Throwable thrown = null; try { //简单明了,直接调用run方法 task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

submit的方式

那么我们可以这里是直接调用的run方法,先看submit的方式,我们知道最终传递过去的是一个FutureTask,也就是说会调用这里的run方法,我们看看实现:

	public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false; //。。。 setException(ex);
                } if (ran)
                    set(result);
            }
        } finally { //省略 } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t; //赋给了这个变量 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion();
        }
    }

可以看到其实类似于直接吞掉了,这样的话我们调用get()方法的时候会拿到, 比如我们可以重写afterExecute方法,从而可以得到实际的异常:

protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { //get这里会首先检查任务的状态,然后将上面的异常包装成ExecutionException Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset }
          } if (t != null){ //异常处理 t.printStackTrace();
          }
        }

execute的方式

那么如果是直接exeture的方式有啥不同呢?这样的话传递过去的就直接是Runnable,因此就会直接抛出:

    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x; throw x;
    } catch ( x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);
    }

那么这里的异常到底会抛出到哪里呢, 我们看看JVM具体是怎么处理的:

if (!destroy_vm || JDK_Version::is_jdk12x_version()) { // JSR-166: change call from from ThreadGroup.uncaughtException to // java.lang.Thread.dispatchUncaughtException if (uncaught_exception.not_null()) { //如果有未捕获的异常 Handle group(this, java_lang_Thread::threadGroup(threadObj()));
      {
        KlassHandle recvrKlass(THREAD, threadObj->klass());
        CallInfo callinfo;
        KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass()); /*   这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法  template(dispatchUncaughtException_name,            "dispatchUncaughtException")   */ LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), KlassHandle(), false, false, THREAD);
        CLEAR_PENDING_EXCEPTION;
        methodHandle method = callinfo.selected_method(); if (method.not_null()) {
          JavaValue result(T_VOID); JavaCalls::call_virtual(&result,
                                  threadObj, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(),
                                  uncaught_exception,
                                  THREAD);
        } else {
          KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());
          JavaValue result(T_VOID); JavaCalls::call_virtual(&result,
                                  group, thread_group, vmSymbols::uncaughtException_name(), vmSymbols::thread_throwable_void_signature(),
                                  threadObj, // Arg 1 uncaught_exception, // Arg 2 THREAD);
        } if (HAS_PENDING_EXCEPTION) {
          ResourceMark rm(this); jio_fprintf(defaultStream::error_stream(), "\nException: %s thrown from the UncaughtExceptionHandler" " in thread \"%s\"\n", pending_exception()->klass()->external_name(), get_thread_name());
          CLEAR_PENDING_EXCEPTION;
        }
      }
    }

可以看到这里最终会去调用Thread#dispatchUncaughtException方法:

    private void dispatchUncaughtException(Throwable e) { //默认会调用ThreadGroup的实现 getUncaughtExceptionHandler().uncaughtException(this, e);
    }


    public void uncaughtException(Thread t, Throwable e) { if (parent != null) {
            parent.uncaughtException(t, e);
        } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) { //可以看到会打到System.err里面 System.err.print("Exception in thread \"" + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

这里如果环境是tomcat的话最终会打到catalina.out:

_6145c123-4ec7-4856-b106-6c61e6dca285

总结

对于线程池、包括线程的异常处理推荐一下方式:

1 直接try/catch,个人 基本都是用这种方式

2 线程直接重写整个方法:

       Thread t = new Thread();
       t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { LOGGER.error(t + " throws exception: " + e);
           }
        }); //如果是线程池的模式: ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> { Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(
                (t1, e) -> LOGGER.error(t1 + " throws exception: " + e)); return t;
        });

3 也可以直接重写protected void afterExecute(Runnable r, Throwable t) { }方法

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 深度解析Java线程池的异常处理机制

本博客所有文章如无特别注明均为原创。作者:枫翼复制或转载请以超链接形式注明转自 BOY1024
原文地址《深度解析Java线程池的异常处理机制

相关推荐

发表评论

路人甲 表情
Ctrl+Enter快速提交

网友评论(0)