1. 线程
线程是调度CPU的最小单位,线程模型分为 ULT 和 KLT。
JVM 使用的是KLT模型,java线程和操作系统os线程保持着一对一的映射关系,也就是说每创建一个java线程就意味着在操作系统里新建了一个os线程。
JVM是运行在用户空间的,如果需要创建和销毁线程,需要将线程从用户态切换到内核态(比较耗时,性能较低)。如果并发请求非常多,但是每个线程运行时间比较短。这样就会频繁的创建和销毁线程。可能会出现 创建和销毁线程 比 实际业务处理花费 的时间和资源 还要多。
因此引入了线程池的概念
2. 线程池
线程池 顾名思义就是一个线程缓存。线程是稀缺资源,如果放任代码无限制的创建,不仅会消耗服务器资源,也会影响系统的稳定。因此 JAVA 提供了线程池对线程的统一创建、分配、调优和监控。
2.1 线程池如何使用
ThreadPoolExecutor tpe = new ThreadPoolExecutor(5,10,1000,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
tpe.submit(() -> {
try {
System.out.println("开始执行");
Thread.sleep(1000);
System.out.println("执行结束");
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
tpe.shutdown();
2.2 线程池创建的主要参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:线程最大空闲时间
- unit:时间单位
- workQueue:阻塞队列,存放未执行的任务
- Executors.defaultThreadFactory():线程创建工厂方法
- defaultHandler:线程超过最大线程数时的拒绝策略
2.3 线程池工作流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 【1】
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 【2】
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) // 【3】
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 重复检验,防止并发问题
reject(command); // 拒绝策略
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 【4】
reject(command);
}
- 【1】 ctl :该参数包含线程池的两个重要参数:1. 线程池运行状态【32位的前3位】 、2. 线程池中运行的线程数(核心线程+非核心线程)。 重复获取的原因是:ctl.get()不是原子性的,防止并发访问时,数据的脏读。
RUNNING = -1 << COUNT_BITS; // 1110 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN = 0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 0000
STOP = 1 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 0000
TIDYING = 2 << COUNT_BITS; // 0100 0000 0000 0000 0000 0000 0000 0000
TERMINATED = 3 << COUNT_BITS; // 0110 0000 0000 0000 0000 0000 0000 0000
workerCountOf(c):通过位运算获取线程池中运行的线程数
【2】 addWorker(command, true):将任务封装成 worker对象,并初始化并执行线程。
【3】 如果核心线程已经满了,则将任务加入到阻塞队列中等待。
【4】 阻塞队列加入失败,创建非核心线程,如果失败,执行失败策略
2.3.1 worker对象
线程池并 不直接执行 我们提交的任务,而是将任务重新封装成 worker 对象。ThreadPool维护的 其实是一组worker对象。
worker类继承了AQS,使用AQS来实现独占锁的功能(不可重入)。
并实现了Runnable接口,所以worker对象本身也是一个线程,在 启动的时候会调用Worker类中的run方法。
类属性:
- firstTask:用它来保存传入的任务
- thread:在调用构造方法时通过 ThreadFactory 来创 建的线程,是用来处理任务的线程
2.3.2 addWorker 方法
execute方法中,会调用addWorker来创建并执行线程(核心+非核心)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/* 保证线程安全代码 */
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
/*上面代码是为了线程安全,判断线程池是否存活、核心线程是否空余等(多线程情况下,可能会有核心线程空余出来)*/
/*新建线程并执行*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*判断线程池是存活的*/
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
/*执行任务*/
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
以上代码真正用于创建并执行任务的代码只有三行,其余代码都是为了保证并发安全。
w = new Worker(firstTask); // 创建worker,并绑定线程
final Thread t = w.thread;
t.start(); // 执行线程
t.start(); 执行后,worker对象会被调用并执行 worker.run() 方法
2.3.3 Worker类的 runWorker 方法
runWorker 方法的执行过程:
- 先获取worker对象中的任务 firstTask
- 如果 firstTask 为空,则循环中队列里拉取任务。
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 队列任务全部执行完成,执行processWorkerExit,然后销毁线程。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
/**
* getTask:
* 核心线程:通过take直接获取,如果队列为空,则阻塞等待获取。
* 非核心线程:通过poll获取并设置超时时间,获取时如果超过keepAliveTime
* 则获取失败返回null
*/
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* 判断线程处于stop状态时,线程一定是中断的
* stop状态:不接受新的任务,也不从队列里拉取新的线程,并中断正在处理的线程
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 空方法,允许子类重写
Throwable thrown = null;
try {
task.run(); // 真正执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 空方法,允许子类重写
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2.3.4 getTask 方法
核心线程:通过take直接获取,如果队列为空,则阻塞等待获取。
非核心线程:通过poll获取并设置超时时间,获取时如果超过keepAliveTime,则获取失败返回null
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程是否已经stop,如果是工作线程数-1,并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 核心线程没有超时设置,
// 非核心线程需要超时设置
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize:当前线程数>最大线程数,最大线程数可能被其他线程修改。
// timed && timedOut 为true 表示执行的是非核心线程,且已经超时(超时除了性能问题,就是队列已经为空)。
// 以上两种情况说明:该非核心线程已经没有必要存在,可以销毁。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于 maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没 有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了 当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
2.3.5 processWorkerExit 方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果runWorker方法没有发生异常,这段代码不执行
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
// 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
// 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
// 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。还存在非核心线程
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
2.3 拒绝策略
如果创建非核心线程失败,线程池有4中拒绝策略
CallerRunsPolicy:谁调用谁执行。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
AbortPolicy:抛出 RejectedExecutionException 异常
DiscardPolicy:空实现,等待子类重写
DiscardOldestPolicy:从队列里移除一个,再加入
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
2.4 最佳线程数量设置
最佳线程数 = CPU核数*[1+(I/O耗时/CPU耗时)]