Java(Android)线程池详解

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用。本文是基础篇,后面会分享下线程池一些高级功能。

1、new Thread的弊端
执行一个异步任务你还只是如下new Thread吗?

new Thread(new Runnable() {
 
	@Override
	public void run() {
		// TODO Auto-generated method stub
	}
}).start();

那你就out太多了,new Thread的弊端如下:

a. 每次new Thread新建对象性能差。
b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
c. 缺乏更多功能,如定时执行、定期执行、线程中断。
相比new Thread,Java提供的四种线程池的好处在于:
a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
c. 提供定时执行、定期执行、单线程、并发数控制等功能。

2、Java 线程池
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

(1). newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
	final int index = i;
	try {
		Thread.sleep(index * 1000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
 
	cachedThreadPool.execute(new Runnable() {
		@Override
		public void run() {
			System.out.println(index);
		}
	});
}

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

 

(2). newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
	final int index = i;
	fixedThreadPool.execute(new Runnable() {
 
		@Override
		public void run() {
			try {
				System.out.println(index);
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	});
}

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。

定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache

 

(3) newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
 
	@Override
	public void run() {
		System.out.println("delay 3 seconds");
	}
}, 3, TimeUnit.SECONDS);

表示延迟3秒执行。

 

定期执行示例代码如下:

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
 
	@Override
	public void run() {
		System.out.println("delay 1 seconds, and excute every 3 seconds");
	}
}, 1, 3, TimeUnit.SECONDS);

表示延迟1秒后每3秒执行一次。

ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。

 

(4)、newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
	final int index = i;
	singleThreadExecutor.execute(new Runnable() {
 
		@Override
		public void run() {
			try {
				System.out.println(index);
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	});
}

结果依次输出,相当于顺序执行各个任务。

现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。

Java中的ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

在ThreadPoolExecutor类中有几个非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

排队有三种通用策略:

直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

线程池关闭

调用线程池的shutdown()或shutdownNow()方法来关闭线程池

  • shutdown原理:将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
  • shutdownNow原理:将线程池的状态设置成STOP状态,然后中断所有任务(包括正在执行的)的线程,并返回等待执行任务的列表。

中断采用interrupt方法,所以无法响应中断的任务可能永远无法终止。但调用上述的两个关闭之一,isShutdown()方法返回值为true,当所有任务都已关闭,表示线程池关闭完成,则isTerminated()方法返回值为true。当需要立刻中断所有的线程,不一定需要执行完任务,可直接调用shutdownNow()方法。

线程池流程

Java(Android)线程池详解
Java(Android)线程池详解
  1. 判断核心线程池是否已满,即已创建线程数是否小于corePoolSize?没满则创建一个新的工作线程来执行任务。已满则进入下个流程。
  2. 判断工作队列是否已满?没满则将新提交的任务添加在工作队列,等待执行。已满则进入下个流程。
  3. 判断整个线程池是否已满,即已创建线程数是否小于maximumPoolSize?没满则创建一个新的工作线程来执行任务,已满则交给饱和策略来处理这个任务。

优化

1 合理地配置线程池

需要针对具体情况而具体处理,不同的任务类别应采用不同规模的线程池,任务类别可划分为CPU密集型任务、IO密集型任务和混合型任务。

  • 对于CPU密集型任务:线程池中线程个数应尽量少,不应大于CPU核心数;
  • 对于IO密集型任务:由于IO操作速度远低于CPU速度,那么在运行这类任务时,CPU绝大多数时间处于空闲状态,那么线程池可以配置尽量多些的线程,以提高CPU利用率;
  • 对于混合型任务:可以拆分为CPU密集型任务和IO密集型任务,当这两类任务执行时间相差无几时,通过拆分再执行的吞吐率高于串行执行的吞吐率,但若这两类任务执行时间有数据级的差距,那么没有拆分的意义。

2 线程池监控

利用线程池提供的参数进行监控,参数如下:

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池曾经创建过的最大线程数量,通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控:继承线程池并重写线程池的beforeExecute(),afterExecute()和terminated()方法,可以在任务执行前、后和线程池关闭前自定义行为。如监控任务的平均执行时间,最大执行时间和最小执行时间等。

使用示例

前面我们讨论了关于线程池的实现原理,这一节我们来看一下它的具体使用:

public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
          
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
             executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
 
class MyTask implements Runnable {
    private int taskNum;
     
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}

执行结果:

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

下面是这三个静态方法的具体实现;

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

关于高级线程池的探讨

简单线程池存在一些问题,比如如果有大量的客户要求服务器为其服务,但由于线程池的工作线程是有限的,服务器只能为部分客户服务,其它客户提交的任务,只能在任务队列中等待处理。一些系统设计人员可能会不满这种状况,因为他们对服务器程序的响应时间要求比较严格,所以在系统设计时可能会怀疑线程池技术的可行性,但是线程池有相应的解决方案。调整优化线程池尺寸是高级线程池要解决的一个问题。主要有下列解决方案:

方案一:动态增加工作线程

在一些高级线程池中一般提供一个可以动态改变的工作线程数目的功能,以适应突发性的请求。一旦请求变少了将逐步减少线程池中工作线程的数目。当然线程增加可以采用一种超前方式,即批量增加一批工作线程,而不是来一个请求才建立创建一个线程。批量创建是更加有效的方式。该方案还有应该限制线程池中工作线程数目的上限和下限。否则这种灵活的方式也就变成一种错误的方式或者灾难,因为频繁的创建线程或者短时间内产生大量的线程将会背离使用线程池原始初衷–减少创建线程的次数。

举例:Jini中的TaskManager,就是一个精巧线程池管理器,它是动态增加工作线程的。SQL Server采用单进程(Single Process)多线程(Multi-Thread)的系统结构,1024个数量的线程池,动态线程分配,理论上限32767。

方案二:优化工作线程数目

如果不想在线程池应用复杂的策略来保证工作线程数满足应用的要求,你就要根据统计学的原理来统计客户的请求数目,比如高峰时段平均一秒钟内有多少任务要求处理,并根据系统的承受能力及客户的忍受能力来平衡估计一个合理的线程池尺寸。线程池的尺寸确实很难确定,所以有时干脆用经验值。

举例:在MTS中线程池的尺寸固定为100。

方案三:一个服务器提供多个线程池

在一些复杂的系统结构会采用这个方案。这样可以根据不同任务或者任务优先级来采用不同线程池处理。

举例:COM+用到了多个线程池。

这三种方案各有优缺点。在不同应用中可能采用不同的方案或者干脆组合这三种方案来解决实际问题。

 

线程池技术适用范围及应注意的问题

下面是我总结的一些线程池应用范围,可能是不全面的。

线程池的应用范围:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速相应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现”OutOfMemory”的错误。

Java线程池异常处理最佳实践

问题描述:
假设我们有一个线程池,由于程序需要,我们向该线程池中提交了好多任务,但是这些任务都没有对异常进行try catch处理,并且运行的时候都抛出了异常。这会对线程池的运行带来什么影响?

正确答案是:没有影响。
想一下,如果是你开发了一个线程池供开发者使用,你会不会对这种情况做处理?想想也是肯定的,不然你提供给别人使用的东西就是有问题的,欠考虑的。而且java线程池的主要开发人员是大名鼎鼎的Doug Lea,你觉得他开发的代码怎么会允许出现这种问题?

问题分析

接下来我们来看一下java中的线程池是如何运行我们提交的任务的,详细流程比较复杂,这里我们不关注,我们只关注任务执行的部分。java中的线程池用的是ThreadPoolExecutor,真正执行代码的部分是runWorker方法:final void runWorker(Worker w)

//省略无关部分
try {
    beforeExecute(wt, task);  
    Throwable thrown = null;
    try {
        task.run();  //执行程序逻辑
    } catch (RuntimeException x) { //捕获RuntimeException
        thrown = x; throw x;
    } catch (Error x) { //捕获Error
        thrown = x; throw x;
    } catch (Throwable x) {   //捕获Throwable
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);  //运行完成,进行后续处理
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}
//省略无关部分

可以看到,程序会捕获包括Error在内的所有异常,并且在程序最后,将出现过的异常和当前任务传递给afterExecute方法。

而ThreadPoolExecutor中的afterExecute方法是没有任何实现的:

protected void afterExecute(Runnable r, Throwable t) { }

也就是说,默认情况下,线程池会捕获任务抛出的所有异常,但是不做任何处理。

存在问题

想象下ThreadPoolExecutor这种处理方式会有什么问题?
这样做能够保证我们提交的任务抛出了异常不会影响其他任务的执行,同时也不会对用来执行该任务的线程产生任何影响。
问题就在afterExecute方法上,这个方法没有做任何处理,所以如果我们的任务抛出了异常,我们也无法立刻感知到。即使感知到了,也无法查看异常信息。

所以,作为一名好的开发者,是不应该允许这种情况出现的。

如何避免这种问题

思路很简单。
1. 在提交的任务中将异常捕获并处理,不抛给线程池。
2. 异常抛给线程池,但是我们要及时处理抛出的异常。

第一种思路很简单,就是我们提交任务的时候,将所有可能的异常都Catch住,并且自己处理,任务的大致代码如下:

 @Override
    public void run() {
        try {
            //处理所有的业务逻辑
        } catch (Throwable e) {
            //打印日志等
        } finally {
            //其他处理
        }
    }

说白了就是把业务逻辑都trycatch起来。
但是这种思路的缺点就是:1)所有的不同任务类型都要trycatch,增加了代码量。2)不存在checkedexception的地方也需要都trycatch起来,代码丑陋。

第二种思路就可以避免上面的两个问题。
第二种思路又有以下几种实现方式:
1. 自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法。
2. 实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该handler传递给线程池的ThreadFactory
3. 采用Future模式,将返回结果以及异常放到Future中,在Future中处理
4. 继承ThreadGroup,覆盖其uncaughtException方法。(与第二种方式类似,因为ThreadGroup类本身就实现了Thread.UncaughtExceptionHandler接口)

下面是以上几种方式的代码

方式1

自定义线程池:

final class PoolService {
  // The values have been hard-coded for brevity
  ExecutorService pool = new CustomThreadPoolExecutor(
      10, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
  // ...
}
class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  // ... Constructor ...
  public CustomThreadPoolExecutor(
      int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  }
  @Override
  public void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (t != null) {
      // Exception occurred, forward to handler
    }
    // ... Perform task-specific cleanup actions
  }
  @Override
  public void terminated() {
    super.terminated();
    // ... Perform final clean-up actions
  }
}

方式2

实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该handler传递给线程池的ThreadFactory

final class PoolService {
  private static final ThreadFactory factory =
      new ExceptionThreadFactory(new MyExceptionHandler());
  private static final ExecutorService pool =
      Executors.newFixedThreadPool(10, factory);
  public void doSomething() {
    pool.execute(new Task()); // Task is a runnable class
  }
  public static class ExceptionThreadFactory implements ThreadFactory  {
    private static final ThreadFactory defaultFactory =
        Executors.defaultThreadFactory();
    private final Thread.UncaughtExceptionHandler handler;
    public ExceptionThreadFactory(
        Thread.UncaughtExceptionHandler handler)
    {
      this.handler = handler;
    }
    @Override public Thread newThread(Runnable run) {
      Thread thread = defaultFactory.newThread(run);
      thread.setUncaughtExceptionHandler(handler);
      return thread;
    }
  }
  public static class MyExceptionHandler extends ExceptionReporter
      implements Thread.UncaughtExceptionHandler {
    // ...
    @Override public void uncaughtException(Thread thread, Throwable t) {
      // Recovery or logging code
    }
  }
}

方式3

继承ThreadGroup,覆盖其uncaughtException方法

public class ThreadGroupExample {
    public static class MyThreadGroup extends ThreadGroup {
        public MyThreadGroup(String s) {
            super(s);
        }
        public void uncaughtException(Thread thread, Throwable throwable) {
            System.out.println("Thread " + thread.getName()
              + " died, exception was: ");
            throwable.printStackTrace();
        }
    }
    public static ThreadGroup workerThreads =
      new MyThreadGroup("Worker Threads");
    public static class WorkerThread extends Thread {
        public WorkerThread(String s) {
            super(workerThreads, s);
        }
        public void run() {
            throw new RuntimeException();
        }
    }
    public static void main(String[] args) {
        Thread t = new WorkerThread("Worker Thread");
        t.start();
    }
}

确实这种方式与上面通过ThreadFactory来指定UncaughtExceptionHandler是一样的,只是代码逻辑不同,但原理上都是一样的,即给线程池中的每个线程都指定一个UncaughtExceptionHandler。

** 注意:上面三种方式针对的都是通过execute(xx)的方式提交任务,如果你提交任务用的是submit()方法,那么上面的三种方式都将不起作用,而应该使用下面的方式 **

方式4

如果提交任务的时候使用的方法是submit,那么该方法将返回一个Future对象,所有的异常以及处理结果都可以通过future对象获取。
采用Future模式,将返回结果以及异常放到Future中,在Future中处理

final class PoolService {
  private final ExecutorService pool = Executors.newFixedThreadPool(10);
  public void doSomething() {
    Future<?> future = pool.submit(new Task());
    // ...
    try {
      future.get();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt(); // Reset interrupted status
    } catch (ExecutionException e) {
      Throwable exception = e.getCause();
      // Forward to exception reporter
    }
  }
}

总结

  1. java线程池会捕获任务抛出的异常和错误,但不做任何处理
  2. 好的程序设计应该考虑到对于类异常的处理
  3. 处理线程池中的异常有两种思路:
    1)提交到线程池中的任务自己捕获异常并处理,不抛给线程池
    2)由线程池统一处理
  4. 对于execute方法提交的线程,有两种处理方式
    1)自定义线程池并实现afterExecute方法
    2)给线程池中的每个线程指定一个UncaughtExceptionHandler,由handler来统一处理异常。
  5. 对于submit方法提交的任务,异常处理是通过返回的Future对象进行的。

 

本文:Java(Android)线程池详解

2 Comments

发表评论