Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor框架简介
在Java5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java5中引入的,其内部使用了线程池机制,它在java.util.concurrent包里。通过该框架来控制线程启动、执行和关闭,可以简化并发编程的操作。因此,在Java5之后,通过Executor来启动线程比使用Thread的start的方法更好,除了更易管理,效率更好外,还有关键的一点:有助于避免this逃逸问题(如果我们在构造器中启动了一个线程,因为另外一个可能会在构造器结束之前执行,此时可能会访问到初始化了一半的对象的用Executor在构造器中)。
Executor框架包括:线程池,Executor,ExecutorService,CompletionService,Future,Callable等。
Executor接口中定义了一个方法execute,该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runable接口的类。ExecutorService接口继承自Executor接口,它提供了了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成的Future的方法。可以调用ExecutorService的shutdown方法来平滑的关闭ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完毕(已提交的任务分为两类:一类是已经在执行的,一类是还没有开始执行的)。当所有的已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。
ExecutorService的生命周期包括三种状态:运行、关闭和终止。创建后变进入运行状态,当调用了shundown()方法,便进入了关闭状态,此时意味着ExecutorService不在接受新的任务,但它还在执行已经提交的任务,当已经提交的任务执行完成后,便达到终止的状态。如果不调用shutdown()方法,ExecutorService会一直在运行状态,不断接受新的任务,执行新的任务,服务器端一般不需要去关闭它,保持一直运行即可。
使用Executor创建线程池
Executors提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
newCachedThreadPool()
- 缓存性池子,先查看池中有没有以前建立的线程,如果有,就reuse。如果没有,就建立一个新的线程加入池中。
- 缓存性池子通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的daemon型Service中用的不多。但对于生存期短的异步任务,它是Executor的首选。
- 能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDEL时长,线程实例将被终止并移除池。
- 放入CachedThreadPool的线程不必担心其结束,超过TImeout不活动,就会被自动终止。
- chche池线程数数支持0-Interger.MAX(显然没考虑主机的资源承受能力)。
newFixedThreadPool(int)
- newFixedThreadPool与CacheThreadPool差不多,也是能reuse就用,但不能随时创建新的线程。
- 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,知道当前的线程中某个线程终止被移除池子。
- fixed池线程数固定,并且是0秒IDEL(无IDLE),多用于服务器。
newScheduleThreadPool(int)
- 调度型线程池
- 这个池子里的线程可以按schedule依次delay执行,或周期执行。
newSingThreadExecutor()
- 单例线程,任意时间池中只能有一个线程。
- 用的是和cache池和fixed池相同的底层池,但线程数目是1,无IDEL
Executor执行Runable任务
通过Executors的静态工厂方法获得Executor实例,然后调用该实例的execute方法即可。一旦Runable传递到execute()方法,该方法便会自动在一个线程上执行。下面是Executor执行Runable任务的示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCachedThreadPool{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
// ExecutorService executorService = Executors.newFixedThreadPool(5);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++){
executorService.execute(new TestRunnable());
System.out.println("************* a" + i + " *************");
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable{
public void run(){
System.out.println(Thread.currentThread().getName() + "线程被调用了。");
}
}
Executor执行Callable任务
在Java5之后,任务分为两类:一类是实现了Runable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable task)方法来执行,并且返回一个Future,表示任务等待完成的Future。
Callable接口类似于Runable,两者都是为那些实例可能被另一个线程执行的类设计的。但是Runable不会返回结果,并且无法抛出经过检查的异常。而Callable能够返回结果,而且当获取返回结果的时候可能抛出异常。Callable中的call()方法看类似于Runable中的run()方法,区别是前者有返回值,后者没有返回值。
当将一个Callable的对象传递给ExecutorService的submit()方法,则该call()方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runable的对象传递给ExecutorService的submit()方法,则该run()方法自动在一个线程上执行,并且会返回Future对象,但是在该对象上调用get()方法,将返回null。
一个ExecutorService执行Callable任务的示例代码:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//创建10个任务并执行
for (int i = 0; i < 10; i++){
//使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//将任务执行结果存储到List中
resultList.add(future);
}
//遍历任务的结果
for (Future<String> fs : resultList){
try{
while(!fs.isDone);//Future返回如果没有完成,则一直循环等待,直到Future返回完成
System.out.println(fs.get()); //打印各个线程(任务)执行的结果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,
* 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
System.out.println("call()方法被自动调用!!! " + Thread.currentThread().getName());
//该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName();
}
}
自定义线程池
自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法可以创建创建线程池,用该类很容易实现自定义的线程池,下面是一个示例代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest{
public static void main(String[] args){
//创建等待队列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//创建线程池,池中保存的线程数为3,允许的最大线程数为5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//创建七个任务
Runnable t1 = new MyThread();
Runnable t2 = new MyThread();
Runnable t3 = new MyThread();
Runnable t4 = new MyThread();
Runnable t5 = new MyThread();
Runnable t6 = new MyThread();
Runnable t7 = new MyThread();
//每个任务会在一个线程上执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//关闭线程池
pool.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
从执行结果中可以看出,七个任务是在三个线程实例上执行的。这里简要说明用到的ThreadPoolExecutor类的构造器的参数的含义:
- corePoolSize: 线程池中所保存的核心的线程数,包括空闲线程。
- maximumPoolSize: 线程池中允许存在的最大线程数。
- keepAliveTime: 线程池中的空闲线程所能持续的最长时间。
- unit:持续时间的单位。
- workQueue:任务执行前保存任务的队列,仅保存由execute()方法提交的Runable任务。
根据ThreadPoolExecutor源码前面的注释,我们可以看出,当时图通过execute()方法将一个Runable任务添加到线程池中时,按照如下顺序来处理:
- 如果线程池中的数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务。
- 如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则一次等待执行(线程池中又空闲线程后依次取出缓冲队列中的任务交给空闲线程来执行);
- 如果线程池中的数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
- 如果线程池中的线程数量大于了maximumPoolSize,会产生线程溢出,该构造器的第5个参数(RejectedExecutionHandler)可以处理这种情况。