这篇文章主要介绍“SpringBoot线程池和Java线程池怎么使用”,在日常操作中,相信很多人在SpringBoot线程池和Java线程池怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot线程池和Java线程池怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
SpringBoot线程池和Java线程池的用法和实现原理
使用默认的线程池
方式一:通过@Async注解调用
public class AsyncTest {
    @Async
    public void async(String name) throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }
}启动类上需要添加
@EnableAsync注解,否则不会生效。
@SpringBootApplication
//@EnableAsync
public class Test1Application {
   public static void main(String[] args) throws InterruptedException {
      ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
      AsyncTest bean = run.getBean(AsyncTest.class);
      for(int index = 0; index <= 10; ++index){
         bean.async(String.valueOf(index));
      }
   }
}方式二:直接注入 ThreadPoolTaskExecutor
此时可不加
@EnableAsync注解
@SpringBootTest
class Test1ApplicationTests {
   @Resource
   ThreadPoolTaskExecutor threadPoolTaskExecutor;
   @Test
   void contextLoads() {
      Runnable runnable = () -> {
         System.out.println(Thread.currentThread().getName());
      };
      for(int index = 0; index <= 10; ++index){
         threadPoolTaskExecutor.submit(runnable);
      }
   }
}线程池默认配置信息
SpringBoot线程池的常见配置:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 Integer.MAX_VALUE keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
SpringBoot 线程池的实现原理
TaskExecutionAutoConfiguration类中定义了
ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的
ThreadPoolExecutor类。
initializeExecutor()方法在其父类中被调用,但是在父类中
RejectedExecutionHandler被定义为了
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();,并通过
initialize()方法将
AbortPolicy传入
initializeExecutor()中。
注意在
TaskExecutionAutoConfiguration类中,
ThreadPoolTaskExecutor类的bean的名称为:
applicationTaskExecutor和
taskExecutor。
// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
      AsyncAnnotationBeanPostProcessor.DEFAUL
          T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
   return builder.build();
}// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
   BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
   ThreadPoolExecutor executor;
   if (this.taskDecorator != null) {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler) {
         @Override
         public void execute(Runnable command) {
            Runnable decorated = taskDecorator.decorate(command);
            if (decorated != command) {
               decoratedTaskMap.put(decorated, command);
            }
            super.execute(decorated);
         }
      };
   }
   else {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler);
   }
   if (this.allowCoreThreadTimeOut) {
      executor.allowCoreThreadTimeOut(true);
   }
   this.threadPoolExecutor = executor;
   return executor;
}// ExecutorConfigurationSupport#initialize()
public void initialize() {
   if (logger.isInfoEnabled()) {
      logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
   }
   if (!this.threadNamePrefixSet && this.beanName != null) {
      setThreadNamePrefix(this.beanName + "-");
   }
   this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}覆盖默认的线程池
覆盖默认的
taskExecutor对象,bean的返回类型可以是
ThreadPoolTaskExecutor也可以是
Executor。
@Configuration
public class ThreadPoolConfiguration {
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}管理多个线程池
如果出现了多个线程池,例如再定义一个线程池
taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。
@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    //设置线程池参数信息
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(50);
    taskExecutor.setQueueCapacity(200);
    taskExecutor.setKeepAliveSeconds(60);
    taskExecutor.setThreadNamePrefix("myExecutor2--");
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setAwaitTerminationSeconds(60);
    //修改拒绝策略为使用当前线程执行
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //初始化线程池
    taskExecutor.initialize();
    return taskExecutor;
}引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
对于使用
@Async注解的多线程则在注解中指定bean的名字即可。
@Async("taskExecutor2")
    public void async(String name) throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }线程池的四种拒绝策略
JAVA常用的四种线程池
ThreadPoolExecutor类的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}newCachedThreadPool
不限制最大线程数(
maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newFixedThreadPool
定长线程池,超出线程数的任务会在队列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newScheduledThreadPool
类似于
newCachedThreadPool,线程数无上限,但是可以指定
corePoolSize。可实现延迟执行、周期执行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}周期执行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{
   System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);延时执行:
scheduledThreadPool.schedule(()->{
   System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);newSingleThreadExecutor
单线程线程池,可以实现线程的顺序执行。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}Java 线程池中的四种拒绝策略
CallerRunsPolicy:线程池让调用者去执行。
AbortPolicy:如果线程池拒绝了任务,直接报错。
DiscardPolicy:如果线程池拒绝了任务,直接丢弃。
DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
CallerRunsPolicy
直接在主线程中执行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
 
    public CallerRunsPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}效果类似于:
Runnable thread = ()->{
   System.out.println(Thread.currentThread().getName());
   try {
      Thread.sleep(0);
   } catch (InterruptedException e) {
      throw new RuntimeException(e);
   }
};
thread.run();AbortPolicy
直接抛出
RejectedExecutionException异常,并指示任务的信息,线程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler {
 
    public DiscardPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}DiscardOldestPolicy
e.getQueue().poll(): 取出队列最旧的任务。
e.execute(r): 当前任务入队。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 
    public DiscardOldestPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}Java 线程复用的原理
java的线程池中保存的是
java.util.concurrent.ThreadPoolExecutor.Worker对象,该对象在 被维护在
private final HashSet<Worker> workers = new HashSet<Worker>();。
workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到
workQueue队列中。
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}work对象的执行依赖于
runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}