从0到1 - 带你了解线程池


1. 线程

线程是调度CPU的最小单位,线程模型分为 ULT 和 KLT。

JVM 使用的是KLT模型,java线程和操作系统os线程保持着一对一的映射关系,也就是说每创建一个java线程就意味着在操作系统里新建了一个os线程。

JVM是运行在用户空间的,如果需要创建和销毁线程,需要将线程从用户态切换到内核态(比较耗时,性能较低)。如果并发请求非常多,但是每个线程运行时间比较短。这样就会频繁的创建和销毁线程。可能会出现 创建和销毁线程实际业务处理花费 的时间和资源 还要多。

因此引入了线程池的概念

2. 线程池

线程池 顾名思义就是一个线程缓存。线程是稀缺资源,如果放任代码无限制的创建,不仅会消耗服务器资源,也会影响系统的稳定。因此 JAVA 提供了线程池对线程的统一创建、分配、调优和监控。

2.1 线程池如何使用

ThreadPoolExecutor tpe = new ThreadPoolExecutor(5,10,1000,
    TimeUnit.MILLISECONDS,  new ArrayBlockingQueue<Runnable>(10));
    tpe.submit(() -> {
        try {
            System.out.println("开始执行");
            Thread.sleep(1000);
            System.out.println("执行结束");
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
    });
tpe.shutdown();    

2.2 线程池创建的主要参数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:线程最大空闲时间
  • unit:时间单位
  • workQueue:阻塞队列,存放未执行的任务
  • Executors.defaultThreadFactory():线程创建工厂方法
  • defaultHandler:线程超过最大线程数时的拒绝策略

2.3 线程池工作流程

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get(); // 【1】
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) // 【2】
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command))  // 【3】
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) // 重复检验,防止并发问题
                reject(command); // 拒绝策略
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 【4】
            reject(command);
    }
  • 【1】 ctl :该参数包含线程池的两个重要参数:1. 线程池运行状态【32位的前3位】 、2. 线程池中运行的线程数(核心线程+非核心线程)。 重复获取的原因是:ctl.get()不是原子性的,防止并发访问时,数据的脏读。
    RUNNING    = -1 << COUNT_BITS; // 1110 0000 0000 0000 0000 0000 0000 0000
    SHUTDOWN   =  0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 0000
    STOP       =  1 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 0000
    TIDYING    =  2 << COUNT_BITS; // 0100 0000 0000 0000 0000 0000 0000 0000
    TERMINATED =  3 << COUNT_BITS; // 0110 0000 0000 0000 0000 0000 0000 0000
  • workerCountOf(c):通过位运算获取线程池中运行的线程数

  • 【2】 addWorker(command, true):将任务封装成 worker对象,并初始化并执行线程。

  • 【3】 如果核心线程已经满了,则将任务加入到阻塞队列中等待。

  • 【4】 阻塞队列加入失败,创建非核心线程,如果失败,执行失败策略

2.3.1 worker对象

线程池并 不直接执行 我们提交的任务,而是将任务重新封装成 worker 对象。ThreadPool维护的 其实是一组worker对象。

worker类继承了AQS,使用AQS来实现独占锁的功能(不可重入)。

并实现了Runnable接口,所以worker对象本身也是一个线程,在 启动的时候会调用Worker类中的run方法。

类属性:

  • firstTask:用它来保存传入的任务
  • thread:在调用构造方法时通过 ThreadFactory 来创 建的线程,是用来处理任务的线程

2.3.2 addWorker 方法

execute方法中,会调用addWorker来创建并执行线程(核心+非核心)

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        /* 保证线程安全代码 */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        /*上面代码是为了线程安全,判断线程池是否存活、核心线程是否空余等(多线程情况下,可能会有核心线程空余出来)*/
    
        /*新建线程并执行*/
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    /*判断线程池是存活的*/
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    /*执行任务*/
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

以上代码真正用于创建并执行任务的代码只有三行,其余代码都是为了保证并发安全。

w = new Worker(firstTask); // 创建worker,并绑定线程
final Thread t = w.thread;
t.start(); // 执行线程

t.start(); 执行后,worker对象会被调用并执行 worker.run() 方法

2.3.3 Worker类的 runWorker 方法

runWorker 方法的执行过程:

  • 先获取worker对象中的任务 firstTask
  • 如果 firstTask 为空,则循环中队列里拉取任务。
  • 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  • 队列任务全部执行完成,执行processWorkerExit,然后销毁线程。
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        /**
         * getTask:
         *         核心线程:通过take直接获取,如果队列为空,则阻塞等待获取。
         *         非核心线程:通过poll获取并设置超时时间,获取时如果超过keepAliveTime
         *                  则获取失败返回null
         */
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /**
             * 判断线程处于stop状态时,线程一定是中断的
             * stop状态:不接受新的任务,也不从队列里拉取新的线程,并中断正在处理的线程
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 空方法,允许子类重写
                Throwable thrown = null;
                try {
                    task.run(); // 真正执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 空方法,允许子类重写
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

2.3.4 getTask 方法

核心线程:通过take直接获取,如果队列为空,则阻塞等待获取。
非核心线程:通过poll获取并设置超时时间,获取时如果超过keepAliveTime,则获取失败返回null

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 判断线程是否已经stop,如果是工作线程数-1,并返回null 
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 核心线程没有超时设置,
        // 非核心线程需要超时设置
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // wc > maximumPoolSize:当前线程数>最大线程数,最大线程数可能被其他线程修改。
        // timed && timedOut 为true 表示执行的是非核心线程,且已经超时(超时除了性能问题,就是队列已经为空)。
        // 以上两种情况说明:该非核心线程已经没有必要存在,可以销毁。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于 maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没 有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了 当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

2.3.5 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果runWorker方法没有发生异常,这段代码不执行
        if (completedAbruptly) 
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 从workers中移除,也就表示着从线程池中移除了一个工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();
        // 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
        // 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
        // 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。还存在非核心线程
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

2.3 拒绝策略

如果创建非核心线程失败,线程池有4中拒绝策略

  • CallerRunsPolicy:谁调用谁执行。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
          r.run();
      }
    }
    
  • AbortPolicy:抛出 RejectedExecutionException 异常

  • DiscardPolicy:空实现,等待子类重写

  • DiscardOldestPolicy:从队列里移除一个,再加入

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

2.4 最佳线程数量设置

最佳线程数 = CPU核数*[1+(I/O耗时/CPU耗时)]


文章作者: zhouxh-z
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 zhouxh-z !
 上一篇
spring IOC源码解析 spring IOC源码解析
前言spring 是当前最广泛使用的开源框架,而spring framework 则是spring全家桶的基础。spring framework最重要的是 IOC 和 AOP。其中 IOC 又是Spring framework 的基础。今天
2020-12-31
下一篇 
【MySQL】删库跑路?了解下bin-log! 【MySQL】删库跑路?了解下bin-log!
操作生产数据库时每一个操作都需要反复审核。任意的小错误,都会导致线上“大灾难”!“从删库到跑路”,可以说是IT业内老梗了。 但是真的不小心误删了,真的就无法挽救了吗? 其实也没这么夸张,真实的生产数据库往往“热备”和“冷备”同时进行
2020-12-16
  目录