java多线程之线程池(4)_线程池 lyzxii-程序员宅基地

技术标签: 多线程  线程池  

1.Executor框架浅析 

首先我们得明白一个问题,为什么需要线程池?在java中,使用线程来执行异步任务时,线程的创建和销毁需要一定的开销,如果我们为每一个任务创建一个新的线程来执行的话,那么这些线程的创建与销毁将消耗大量的计算资源。同时为每一个任务创建一个新线程来执行,这样的方式可能会使处于高负荷状态的应用最终崩溃。所以线程池的出现为解决这个问题带来曙光。我们将在线程池中创建若干条线程,当有任务需要执行时就从该线程池中获取一条线程来执行任务,如果一时间任务过多,超出线程池的线程数量,那么后面的线程任务就进入一个等待队列进行等待,直到线程池有线程处于空闲时才从等待队列获取要执行的任务进行处理,以此循环.....这样就大大减少了线程创建和销毁的开销,也会缓解我们的应用处于超负荷时的情况。

1.1 Executor框架的两级调度模型

在java线程启动时会创建一个本地操作系统线程,当该java线程终止时,这个操作系统线程也会被回收。而每一个java线程都会被一对一映射为本地操作系统的线程,操作系统会调度所有的线程并将它们分别给可用的CPU。而所谓的映射方式是这样实现的,在上层,java多线程程序通过把应用分为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这样种两级调度模型如下图所示:

从图中我们可以看出,应用程序通过Executor框架控制上层的调度,而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

 

1.2 Executor框架的结构

Executor框架的结构主要包括3个部分

  • 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口
  • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的EexcutorService接口。Exrcutor有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 异步计算的结果:包括接口Future和实现Future接口的FutureTask类

UML关系图:

下面我们通过一个UML图来认识一下这些类间的关系:

Extecutor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令,比Timer更灵活强大

 

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor执行。区别就是Runnable无法返回执行结果,而Callable可以返回执行结果。

 

1.3 Executor框架各个接口的解读

 

Runnable接口和Callable接口具体区别:

Runnable是java.lang包下,是一个接口,里面只声明了一个run()方法: 由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,也是一个接口,里面也只声明了一个方法叫做call(),可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

public interface Runnable{

    public abstract void run();

}

public interface Callable<V>{

    /**

     *Computes a result, or throws an exception if unable to do so.

     *

     *@return computed result

     *@throws Exception if unable to compute a result

     */

    Vcall() throws Exception;

}

我们通过一张图来理解它们间的执行关系

分析说明:

1.主线程首先创建实现Runnable或Callable接口的任务对象,在重写的方法里实现具体的任务细节

    此外我们可以利用工具类Executors把一个Runnable对象封装为一个Callable对象,具体如下面的两种方式:

Executors.callable(Runnabletask)或者Executors.callable(Runnabletask,Object result)

2.然后可以把Runnable对象或者Callbale对象直接提交给ExecutorService执行,在ExecutorService接口中有以下若干个方法。

  • submit()方法可以执行Runnable对象或者Callbale对象,
  • execute()方法则是执行Runnable对象的

这里需要注意的是如果执行submit()方法将返回一个实现Future接口的对象(其实就是FutureTask)。当然由于FutureTask实现了Runnable接口,我们也可以直接创建FutureTask,然后提交给ExecutorService执行。

<T>Future<T> submit(Callable<T> task);

<T>Future<T> submit(Runnable task, T result);(不常用)

Future<?>submit(Runnable task);

Void execute(Runnabletask)

Future接口:

Future就是对于具体的Runnable或者Callable任务的执行进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

       
public interface Future<V>{

    /**cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。*/

    boolean cancel(boolean mayInterruptIfRunning);


    //isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

    boolean isCancelled();

    
    //isDone方法表示任务是否已经完成,若任务完成,则返回true;
    boolean isDone();

    
    //get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
    Vget() throws InterruptedException, ExecutionException;

    
    //get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

    Vget(long timeout, TimeUnit unit)

        throws InterruptedException,ExecutionException, TimeoutException;

}

也就是说Future提供了三种功能:

  1)判断任务是否完成;

  2)能够中断任务;

  3)能够获取任务执行结果。

  因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

 

FutureTask类:

FutureTask类实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口,因此FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable,Future<V> {

    void run();

}

FutureTask是Future的唯一实现类,构造方法如下:

public FutureTask(Callable<V>callable) {

}

public FutureTask(Runnablerunnable, V result) {

}

Future和FutureTask的使用案列

1.使用Callable+Future获取执行结果

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

2.使用Callable+FutureTask获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

2.ThreadPoolExecutor浅析 

ThreadPoolExecutor是线程的真正实现,通常使用工厂类Executors来创建,但它的构造方法提供了一系列参数来配置线程池,下面我们就先介绍ThreadPoolExecutor的构造方法中各个参数的含义。

	public ThreadPoolExecutor(int corePoolSize,  
	                          int maximumPoolSize,  
	                          long keepAliveTime,  
	                          TimeUnit unit,  
	                          BlockingQueue<Runnable> workQueue,  
	                          ThreadFactory threadFactory) {  
	        					this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
	            					 threadFactory, defaultHandler);  
	    }  

corePoolSize:线程池的核心线程数,默认情况下,核心线程数会一直在线程池中存活,即使它们处理闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么闲置的核心线程在等待新任务到来时会执行超时策略,这个时间间隔由keepAliveTime所指定,当等待时间超过keepAliveTime所指定的时长后,核心线程就会被终止。

maximumPoolSize:线程池所能容纳的最大线程数量,当活动线程数到达这个数值后,后续的新任务将会被阻塞。

keepAliveTime:非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true时,keepAliveTime同样会作用于核心线程。

unit:用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。

workQueue:线程池中的任务队列,通过线程池的execute方法提交Runnable对象会存储在这个队列中。

threadFactory:线程工厂,为线程池提供创建新线程的功能。ThreadFactory是一个接口,它只有一个方法:Thread newThread(Runnable r)。

除了上面的参数外还有个不常用的参数,RejectExecutionHandler,这个参数表示当ThreadPoolExecutor已经关闭或者ThreadPoolExecutor已经饱和时(达到了最大线程池大小而且工作队列已经满),execute方法将会调用Handler的rejectExecution方法来通知调用者,默认情况 下是抛出一个RejectExecutionException异常。

RejectExecutionException是一个接口,它有4个实现类,代表4种饱和策略,当有界队列被填满之后,饱和策略开始发挥作用,ThreadPoolExecutor的饱和策略可以通过setRejectedExecutionHandler来修改(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略),JDK提供几种不同的RejectedExecutionHandler实现,每种实现都包含不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

中止(Abort)策略是 默认的饱和策略:AbortPolicy,直接抛出未检查RejectedExecutionExecption,调用者可以捕获这个异常,根据需求处理代码

    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         * 始终抛出异常,交由调用者处理
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

抛弃(Discard)策略会悄悄抛弃该任务,方法体什么都不做

    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

抛弃最旧的(Discard-Oldest)策略会抛弃下一个将被执行的任务(任务队列的头部的任务,也就是s“最旧的”任务),然后尝试重新提交新的任务,(如果工作队列是一个优先队列,那么抛弃最旧的策略抛弃优先级最高的任务,因此最好不要将抛弃最旧的饱和策略和优先级队列放在一起使用)

   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

调用者运行(Caller-Runs)策略实现了一种调节机制,该策略既不会抛弃任务也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量,我们看到这个策略显然不想放弃执行任务。那么就用当前的Executor进行执行。不过,这样也有弊端,那就是阻塞当前Executor线程,造成该线程池无法调度任务。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

 

了解完相关构造函数的参数,我们再来看看ThreadPoolExecutor执行任务时的大致规则:

(1)如果线程池的数量还未达到核心线程的数量,那么会直接启动一个核心线程来执行任务

(2)如果线程池中的线程数量已经达到或者超出核心线程的数量,那么任务会被插入到任务队列中排队等待执行。

(3)如果在步骤(2)中无法将任务插入到任务队列中,这往往是由于任务队列已满,这个时候如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务。

(4)如果在步骤(3)中线程数量已经达到线程池规定的最大值,那么就会拒绝执行此任务,ThreadPoolExecutor会调用RejectExecutionHandler的rejectExecution方法来通知调用者。

到此ThreadPoolExecutor的详细配置了解完了,ThreadPoolExecutor的执行规则也了解完了,那么接下来我们就来介绍3种常见的线程池,它们都直接或者间接地通过配置ThreadPoolExecutor来实现自己的功能特性,这个3种线程池分别是FixedThreadPool,CachedThreadPool,ScheduledThreadPool以及SingleThreadExecutor。

 

2.1 FixedThreadPool

 FixedThreadPool模式会使用一个优先固定数目的线程来处理若干数目的任务。规定数目的线程处理所有任务,一旦有线程处理完了任务就会被用来处理新的任务(如果有的话)。FixedThreadPool模式下最多的线程数目是一定的。创建FixedThreadPool对象代码如下:

ExecutorService fixedThreadPool=Executors.newFixedThreadPool(5);

我们来看看FixedThreadPool创建方法源码:

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }

FixedThreadPool的corePoolSize和maximumPoolSize参数都被设置为nThreads。当线程池中的线程数量大于corePoolSize时,keepAliveTime为非核心空闲线程等待新任务的最长时间,超过这个时间后非核心线程将被终止,这里keepAliveTime设置为0L,就说明非核心线程会立即被终止。事实上这里也没有非核心线程创建,因为核心线程数和最大线程数都一样的。下面我们来看看FixedThreadPool的execute()方法的运行流程

分析:

(1)如果当前运行线程数少corePoolSize,则创建一个新的线程来执行任务。

(2)如果当前线程池的运行线程数等于corePoolSize,那么后面提交的任务将加入LinkedBlockingQueue。

(3)线程在执行完图中的1后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

这里还有点要说明的是FixedThreadPool使用的是无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)。使用该队列作为工作队列会对线程池产生如下影响

(1)当前线程池中的线程数量达到corePoolSize后,新的任务将在无界队列中等待。

(2)由于我们使用的是无界队列,所以参数maximumPoolSize和keepAliveTime无效。

(3)由于使用无界队列,运行中的FixedThreadPool不会拒绝任务(当然此时是未执行shutdown和shutdownNow方法),所以不会去调用RejectExecutionHandler的rejectExecution方法抛出异常。

下面我们给出案例,该案例来自java编程思想一书:

	public class LiftOff implements Runnable{     
	    protected int countDown = 10; //Default     
	    private static int taskCount = 0;     
	    private final int id = taskCount++;      
	    public LiftOff() {}     
	    public LiftOff(int countDown) {     
	        this.countDown = countDown;     
	    }     
	    public String status() {     
        return "#" + id + "(" +     
	            (countDown > 0 ? countDown : "LiftOff!") + ") ";     
	    }     
	    @Override     
	    public void run() {     
        while(countDown-- > 0) {     
	            System.out.print(status());     
	            Thread.yield();     
	        }     
	             
    }        
} 

 

声明一个Runnable对象,使用FixedThreadPool执行任务如下:

	public class FixedThreadPool {     
	    public static void main(String[] args) {    
	        //三个线程来执行五个任务     
        ExecutorService exec = Executors.newFixedThreadPool(3);        
	        for(int i = 0; i < 5; i++) {     
	            exec.execute(new LiftOff());     
	        }    
	        exec.shutdown();     
	    }     
	} 

2.2 CachedThreadPool

CachedThreadPool首先会按照需要创建足够多的线程来执行任务(Task)。随着程序执行的过程,有的线程执行完了任务,可以被重新循环使用时,才不再创建新的线程来执行任务。创建方式:

ExecutorService cachedThreadPool=Executors.newCachedThreadPool();

我们来看看CachedThreadPool创建方法源码:

public static ExecutorService newCachedThreadPool() {  
	   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
	                                 60L, TimeUnit.SECONDS,  
	                                 new SynchronousQueue<Runnable>());  
	}  

从该静态方法,我们可以看到CachedThreadPool的corePoolSize被设置为0,而maximumPoolSize被设置Integer.MAX_VALUE,即maximumPoolSize是无界的,而keepAliveTime被设置为60L,单位为妙。也就是空闲线程等待时间最长为60秒,超过该时间将会被终止。而且在这里CachedThreadPool使用的是没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPoolSize是无界的,也就是意味着如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度时CachedThreadPool将会不断的创建新的线程,在极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。CachedThreadPool的execute()方法的运行流程


 

 

分析:

(1)首先执行SynchronousQueue.offer(Runnable task),添加一个任务。如果当前CachedThreadPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),其中NANOSECONDS是毫微秒即十亿分之一秒(就是微秒/1000),那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则进入第(2)步。

(2)当CachedThreadPool初始线程数为空时,或者当前没有空闲线程,将没有线程去执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这样的情况下,步骤(1)将会失败,此时CachedThreadPool会创建一个新的线程来执行任务,execute()方法执行完成。

(3)在步骤(2)中创建的新线程将任务执行完成后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒,如果60秒内主线程提交了一个新任务,那么这个空闲线程将会执行主线程提交的新任务,否则,这个空闲线程将被终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的 CachedThreadPool是不会使用任何资源的。

根据前面的分析我们知道SynchronousQueue是一个没有容量的阻塞队列(其实个人认为是相对应时间而已的没有容量,因为时间到空闲线程就会被移除)。每个插入操作必须等到一个线程与之对应。CachedThreadPool使用SynchronousQueue,把主线程的任务传递给空闲线程执行。流程如下:

CachedThreadPool使用的案例代码如下

	public class CachedThreadPool {     
	    public static void main(String[] args) {     
	        ExecutorService exec = Executors.newCachedThreadPool();     
	        for(int i = 0; i < 10; i++) {     
	            exec.execute(new LiftOff());     
	        }     
	        exec.shutdown();         
	}

2.3 SingleThreadExecutor

SingleThreadExecutor模式只会创建一个线程。它和FixedThreadPool比较类似,不过线程数是一个。如果多个任务被提交给SingleThreadExecutor的话,那么这些任务会被保存在一个队列中,并且会按照任务提交的顺序,一个先执行完成再执行另外一个线程。SingleThreadExecutor模式可以保证只有一个任务会被执行。这种特点可以被用来处理共享资源的问题而不需要考虑同步的问题。

创建方式:

ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();

构造方法如下:

public static ExecutorService newSingleThreadExecutor() {  
	 return new FinalizableDelegatedExecutorService  
	         (new ThreadPoolExecutor(1, 1,  
	                                 0L, TimeUnit.MILLISECONDS,  
                                        new LinkedBlockingQueue<Runnable>()));  
	          }  

从静态方法可以看出SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数则与FixedThreadPool相同。SingleThreadExecutor使用的工作队列也是无界队列LinkedBlockingQueue。由于SingleThreadExecutor采用无界队列的对线程池的影响与FixedThreadPool一样,这里就不过多描述了。同样的我们先来看看其运行流程:

 

分析:

 

(1)如果当前线程数少于corePoolSize即线程池中没有线程运行,则创建一个新的线程来执行任务。

(2)在线程池的线程数量等于corePoolSize时,将任务加入到LinkedBlockingQueue。

(3)线程执行完成(1)中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。

SingleThreadExecutor使用的案例代码如下:

public class SingleThreadExecutor {     
	 public static void main(String[] args) {     
	      ExecutorService exec = Executors.newSingleThreadExecutor();     
	        for (int i = 0; i < 2; i++) {     
            	    exec.execute(new LiftOff());     
	        }     
	    }     
	}

2.4 各自的适用场景

FixedThreadPool:适用于为了满足资源管理需求,而需要限制当前线程的数量的应用场景,它适用于负载比较重的服务器。

SingleThreadExecutor:适用于需要保证执行顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的场景。

CachedThreadPool:大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。

3.ScheduledThreadPoolExecutor浅析 

3.1 ScheduledThreadPoolExecutor执行机制分析

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但比Timer更强大,更灵活,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。接下来我们先来了解一下ScheduledThreadPoolExecutor的运行机制:

 

 

 

 

分析:DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中无意义。ScheduledThreadPoolExecutor的执行主要分为以下两个部分

(1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduleFutureTask。

(2)线程池中的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务。

3.2 如何创建ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建两种类型的ScheduledThreadPoolExecutor,如下:

(1)ScheduledThreadPoolExecutor:可以执行并行任务也就是多条线程同时执行。

(2)SingleThreadScheduledExecutor:可以执行单条线程。

创建ScheduledThreadPoolExecutor的方法构造如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)  
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)  

创建SingleThreadScheduledExecutor的方法构造如下

 

	public static ScheduledExecutorService newSingleThreadScheduledExecutor()  
	public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)  

3.3 ScheduledThreadPoolExecutorSingleThreadScheduledExecutor的适用场景

ScheduledThreadPoolExecutor:适用于多个后台线程执行周期性任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

SingleThreadScheduledExecutor:适用于需要单个后台线程执行周期任务,同时需要保证任务顺序执行的应用场景。

3.4 ScheduledThreadPoolExecutor使用案例

我们创建一个Runnable的对象,然后使用ScheduledThreadPoolExecutor的Scheduled()来执行延迟任务,输出执行时间即可:

我们先来介绍一下该类延迟执行的方法:

public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

参数解析:

command:就是一个实现Runnable接口的类

delay:延迟多久后执行。

unit:用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。

这里要注意这个方法会返回ScheduledFuture实例,可以用于获取线程状态信息和延迟时间。

public class WorkerThread implements Runnable{  
   		 @Override  
    public void run() {  
         System.out.println(Thread.currentThread().getName()+" Start. Time = "+getNowDate());  
	         threadSleep();  
	         System.out.println(Thread.currentThread().getName()+" End. Time = "+getNowDate());  
	          
	    }  
    /** 
	     * 睡3秒 
     */  
    public void threadSleep(){  
	        try {  
            Thread.sleep(3000);  
	        } catch (InterruptedException e) {  
	            // TODO Auto-generated catch block  
	            e.printStackTrace();  
	        }  
	    }  
     /** 
      * 获取现在时间 
	      *  
      * @return 返回时间类型 yyyy-MM-dd HH:mm:ss 
      */  
    public static String getNowDate() {  
	          Date currentTime = new Date();  
          SimpleDateFormat formatter;   
	            formatter = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");   
            String ctime = formatter.format(currentTime);   
	          return ctime;  
	         }  
	}

执行类:

public class ScheduledThreadPoolTest {

		public static void main(String[] args) {  
			        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
			         try {  
			        //schedule to run after sometime  
	        System.out.println("Current Time = "+getNowDate());  
			        for(int i=0; i<3; i++){  
		            Thread.sleep(1000);  
			            WorkerThread worker = new WorkerThread();  
			            //延迟10秒后执行  
			            scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);  
			        }  
			            Thread.sleep(3000);  
		        } catch (InterruptedException e) {  
		            e.printStackTrace();  
		        scheduledThreadPool.shutdown();  
		        while(!scheduledThreadPool.isTerminated()){  
		            //wait for all tasks to finish  
		        }  
		        System.out.println("Finished all threads");  
		    }		/**
		 * 获取现在时间
		 * 
		 * @return 返回时间类型 yyyy-MM-dd HH:mm:ss
		 */
		public static String getNowDate() {
			Date currentTime = new Date();
			SimpleDateFormat formatter;
			formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			String ctime = formatter.format(currentTime);
			return ctime;
		}
	}

 

线程任务确实在10秒延迟后才开始执行。这就是schedule()方法的使用。下面我们再介绍2个可用于周期性执行任务的方法。

 

 

scheduleAtFixedRate方法的作用是预定在初始的延迟结束后,周期性地执行给定的任务,周期长度为period,其中initialDelay为初始延迟。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

scheduleWithFixedDelay方法的作用是预定在初始的延迟结束后周期性地执行给定任务,在一次调用完成和下一次调用开始之间有长度为delay的延迟,其中initialDelay为初始延迟(简单说是是等上一个任务结束后,在等固定的时间,然后执行。即:执行完上一个任务后再执行)。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit); 

实例代码:

public class ScheduledTask {
		public ScheduledThreadPoolExecutor se = new ScheduledThreadPoolExecutor(
				5);

		public static void main(String[] args) {
			new ScheduledTask();
		}

		public void fixedPeriodSchedule() {
			// 设定可以循环执行的runnable,初始延迟为0,这里设置的任务的间隔为5秒
			for (int i = 0; i < 5; i++) {
				se.scheduleAtFixedRate(new FixedSchedule(), 0, 5,
						TimeUnit.SECONDS);
			}
		}

		public ScheduledTask() {
			fixedPeriodSchedule();
		}

		class FixedSchedule implements Runnable {
			public void run() {
				System.out.println("当前线程:" + Thread.currentThread().getName()
						+ "  当前时间:" + new Date(System.currentTimeMillis()));
			}
		}
	}

运行结果:

至于scheduleWithFixedDelay方法,大家就把代码稍微修改一下执行试试就行,这里就不重复了。而SingleThreadScheduledExecutor的使用的方法基本是类似,只不过是单线程罢了,这里也不再描述了。

如何捕获线程池中线程运行时的异常

主要有下面几个解决方案

  • 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
  • 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理
  • 重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常
  • 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常 (只能够捕获execute()方法)

线程代码不能抛出任何checked异常。所有的线程中的checked异常都只能被线程本身消化掉。 这样本身也是符合线程的设计理念的,线程本身就是被看作独立的执行片断,它应该对自己负责,所以由它来消化所有的checked异常是很正常的。

但是,线程代码中是可以抛出错误(Error)和运行级别异常(RuntimeException)的。Error俺们可以忽略,因为通常Error是应该留给vm的,而RuntimeException确是比较正常的,如果在运行过程中满足了某种条件导致线程必须中断,可以选择使用抛出运行级别异常来处理,当线程代码抛出运行级别异常之后,线程会中断。这点java中解释得很清楚:
@see Thread
All threads that are not daemon threads have died, either by returning from the call to the run method or “by throwing an exception that propagates beyond the run method”.
但是对于invoke此线程的主线程会产生什么影响呢?主线程不受这个影响,不会处理这个RuntimeException,而且根本不能catch到这个异常。会继续执行自己的代码 :)
所以得到结论:线程方法的异常只能自己来处理。

 

1.通过给thread设置一个UncaughtExceptionHandler,可以确保在该线程出现异常时能通过回调UncaughtExceptionHandler接口的public void uncaughtException(Thread t, Throwable e) 方法来处理异常,

@FunctionalInterface
public interface UncaughtExceptionHandler {
    /**
     * Method invoked when the given thread terminates due to the
     * given uncaught exception.
     * <p>Any exception thrown by this method will be ignored by the
     * Java Virtual Machine.
     * @param t the thread
     * @param e the exception
     */
    void uncaughtException(Thread t, Throwable e);
}

UncaughtExceptionHandler是一个FunctionalInterface ,只有一个抽象方法,该回调接口会被Thread中的dispatchUncaughtException调用

/**
 * Dispatch an uncaught exception to the handler. This method is
 * intended to be called only by the JVM.
 */
private void dispatchUncaughtException(Throwable e) {
    getUncaughtExceptionHandler().uncaughtException(this, e);
}

当线程在运行过程中出现异常时,JVM会调用dispatchUncaughtException方法,该方法会将对应的线程实例以及异常信息传递给回调接口

如在使用线程池的时候,我们可以通过自定义的线程工厂来实现处理异常,具体代码如下:

//自定义线程工厂类,创建线程的时候设置线程的UncaughtExeceptionHandler即可
public class HandlerThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread( r,"Thread" + threadNumber.getAndIncrement());
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("线程名字===="+t.getName());
                System.out.println("捕获异常" + e.toString());
            }
        });
        return t;
    }

}

在使用线程池的时候,传入我们自定义的ThreadFactory
public class ExeceptionTest implements Runnable {
    @Override
    public void run() {
        throw new RuntimeException("抛异常");

    }


    public static void main(String[] args){
        //创建线程池 指定线程池创建线程的 ThreadFactory 并设置线程名字
        ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
        service.execute(new ExeceptionTest());

    }

}

执行结果:
线程名字====Thread1
捕获异常java.lang.RuntimeException: 抛异常

2.直接重写线程池中的protected void afterExecute(Runnable r, Throwable t) { }方法

  • 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
  • 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理
  • 重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常
  • 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常 (不推荐)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_37598682/article/details/80166017

智能推荐

【史上最易懂】马尔科夫链-蒙特卡洛方法:基于马尔科夫链的采样方法,从概率分布中随机抽取样本,从而得到分布的近似_马尔科夫链期望怎么求-程序员宅基地

文章浏览阅读1.3k次,点赞40次,收藏19次。虽然你不能直接计算每个房间的人数,但通过马尔科夫链的蒙特卡洛方法,你可以从任意状态(房间)开始采样,并最终收敛到目标分布(人数分布)。然后,根据一个规则(假设转移概率是基于房间的人数,人数较多的房间具有较高的转移概率),你随机选择一个相邻的房间作为下一个状态。比如在巨大城堡,里面有很多房间,找到每个房间里的人数分布情况(每个房间被访问的次数),但是你不能一次进入所有的房间并计数。但是,当你重复这个过程很多次时,你会发现你更有可能停留在人数更多的房间,而在人数较少的房间停留的次数较少。_马尔科夫链期望怎么求

linux以root登陆命令,su命令和sudo命令,以及限制root用户登录-程序员宅基地

文章浏览阅读3.9k次。一、su命令su命令用于切换当前用户身份到其他用户身份,变更时须输入所要变更的用户帐号与密码。命令su的格式为:su [-] username1、后面可以跟 ‘-‘ 也可以不跟,普通用户su不加username时就是切换到root用户,当然root用户同样可以su到普通用户。 ‘-‘ 这个字符的作用是,加上后会初始化当前用户的各种环境变量。下面看下加‘-’和不加‘-’的区别:root用户切换到普通..._限制su root登陆

精通VC与Matlab联合编程(六)_精通vc和matlab联合编程 六-程序员宅基地

文章浏览阅读1.2k次。精通VC与Matlab联合编程(六)作者:邓科下载源代码浅析VC与MATLAB联合编程浅析VC与MATLAB联合编程浅析VC与MATLAB联合编程浅析VC与MATLAB联合编程浅析VC与MATLAB联合编程  Matlab C/C++函数库是Matlab扩展功能重要的组成部分,包含了大量的用C/C++语言重新编写的Matlab函数,主要包括初等数学函数、线形代数函数、矩阵操作函数、数值计算函数_精通vc和matlab联合编程 六

Asp.Net MVC2中扩展ModelMetadata的DescriptionAttribute。-程序员宅基地

文章浏览阅读128次。在MVC2中默认并没有实现DescriptionAttribute(虽然可以找到这个属性,通过阅读MVC源码,发现并没有实现方法),这很不方便,特别是我们使用EditorForModel的时候,我们需要对字段进行简要的介绍,下面来扩展这个属性。新建类 DescriptionMetadataProvider然后重写DataAnnotationsModelMetadataPro..._asp.net mvc 模型description

领域模型架构 eShopOnWeb项目分析 上-程序员宅基地

文章浏览阅读1.3k次。一.概述  本篇继续探讨web应用架构,讲基于DDD风格下最初的领域模型架构,不同于DDD风格下CQRS架构,二者架构主要区别是领域层的变化。 架构的演变是从领域模型到C..._eshoponweb

Springboot中使用kafka_springboot kafka-程序员宅基地

文章浏览阅读2.6w次,点赞23次,收藏85次。首先说明,本人之前没用过zookeeper、kafka等,尚硅谷十几个小时的教程实在没有耐心看,现在我也不知道分区、副本之类的概念。用kafka只是听说他比RabbitMQ快,我也是昨天晚上刚使用,下文中若有讲错的地方或者我的理解与它的本质有偏差的地方请包涵。此文背景的环境是windows,linux流程也差不多。 官网下载kafka,选择Binary downloads Apache Kafka 解压在D盘下或者什么地方,注意不要放在桌面等绝对路径太长的地方 打开conf_springboot kafka

随便推点

VS2008+水晶报表 发布后可能无法打印的解决办法_水晶报表 不能打印-程序员宅基地

文章浏览阅读1k次。编好水晶报表代码,用的是ActiveX模式,在本机运行,第一次运行提示安装ActiveX控件,安装后,一切正常,能正常打印,但发布到网站那边运行,可能是一闪而过,连提示安装ActiveX控件也没有,甚至相关的功能图标都不能正常显示,再点"打印图标"也是没反应解决方法是: 1.先下载"PrintControl.cab" http://support.businessobjects.c_水晶报表 不能打印

一. UC/OS-Ⅱ简介_ucos-程序员宅基地

文章浏览阅读1.3k次。绝大部分UC/OS-II的源码是用移植性很强的ANSI C写的。也就是说某产品可以只使用很少几个UC/OS-II调用,而另一个产品则使用了几乎所有UC/OS-II的功能,这样可以减少产品中的UC/OS-II所需的存储器空间(RAM和ROM)。UC/OS-II是为嵌入式应用而设计的,这就意味着,只要用户有固化手段(C编译、连接、下载和固化), UC/OS-II可以嵌入到用户的产品中成为产品的一部分。1998年uC/OS-II,目前的版本uC/OS -II V2.61,2.72。1.UC/OS-Ⅱ简介。_ucos

python自动化运维要学什么,python自动化运维项目_运维学python该学些什么-程序员宅基地

文章浏览阅读614次,点赞22次,收藏11次。大家好,本文将围绕python自动化运维需要掌握的技能展开说明,python自动化运维从入门到精通是一个很多人都想弄明白的事情,想搞清楚python自动化运维快速入门 pdf需要先了解以下几个事情。这篇文章主要介绍了一个有趣的事情,具有一定借鉴价值,需要的朋友可以参考下。希望大家阅读完这篇文章后大有收获,下面让小编带着大家一起了解一下。_运维学python该学些什么

解决IISASP调用XmlHTTP出现msxml3.dll (0x80070005) 拒绝访问的错误-程序员宅基地

文章浏览阅读524次。2019独角兽企业重金招聘Python工程师标准>>> ..._hotfix for msxml 4.0 service pack 2 - kb832414

python和易语言的脚本哪门更实用?_易语言还是python适合辅助-程序员宅基地

文章浏览阅读546次。python和易语言的脚本哪门更实用?_易语言还是python适合辅助

redis watch使用场景_详解redis中的锁以及使用场景-程序员宅基地

文章浏览阅读134次。详解redis中的锁以及使用场景,指令,事务,分布式,命令,时间详解redis中的锁以及使用场景易采站长站,站长之家为您整理了详解redis中的锁以及使用场景的相关内容。分布式锁什么是分布式锁?分布式锁是控制分布式系统之间同步访问共享资源的一种方式。为什么要使用分布式锁?​ 为了保证共享资源的数据一致性。什么场景下使用分布式锁?​ 数据重要且要保证一致性如何实现分布式锁?主要介绍使用redis来实..._redis setnx watch

推荐文章

热门文章

相关标签