Java线程池教程
什么是线程池
Java线程池是一种用于优化线程管理的技术,它可以在应用程序启动时预先创建一组线程并保存在内存中,以避免频繁地创建和销毁线程。线程池通过提供一个控制并发线程数量的机制,可以有效地管理和控制应用程序的并发访问量,从而提高应用程序的性能和响应速度。
Java线程池通常由Java的Executor框架提供支持,该框架提供了一组接口和类,用于创建和管理线程池。其中,ThreadPoolExecutor类是一个常用的线程池实现,它支持自定义线程池的大小、线程存活时间、工作队列类型等参数,并且提供了丰富的控制和监控接口,方便开发人员进行配置和管理。
使用线程池可以带来很多好处,例如:
- 提高应用程序的性能和响应速度:通过预创建线程并保存在内存中,可以避免频繁地创建和销毁线程,从而提高应用程序的性能和响应速度。
- 降低资源消耗:线程池可以控制并发线程数量,避免过多地创建线程,从而降低系统资源消耗。
- 提高系统稳定性:线程池提供了线程管理和监控机制,可以帮助开发人员及时发现和解决线程相关的问题,从而提高系统的稳定性和可靠性。
- 方便扩展:线程池提供了丰富的配置和管理接口,可以根据实际需求进行定制和扩展,满足不同场景下的需求。
总之,Java线程池是一种非常有用的技术,可以提高应用程序的性能、响应速度、稳定性以及可扩展性。在实际开发中,应该根据实际需求选择合适的线程池实现并进行合理的配置和管理。
线程池 Executor提供的API
ExecutorService
接口:这是线程池的核心接口,它定义了线程池的基本操作,如提交任务、关闭线程池等。Executors
类:这个类提供了一些静态方法来创建不同类型的线程池,如固定大小的线程池、可缓存的线程池等。Future
接口:这个接口表示一个异步计算的结果,可以通过调用get
方法来获取计算结果。Callable
接口:这个接口与Runnable
接口类似,但它可以返回计算结果,并且可以抛出异常。ThreadPoolExecutor
类:这个类是一个可配置的线程池,它实现了ExecutorService
接口,并提供了更多的配置选项,如线程池大小、线程工厂、拒绝策略等。ScheduledThreadPoolExecutor
类:这个类是一个可定时执行的线程池,它继承自ThreadPoolExecutor
类,并提供了定时执行任务的功能。ThreadFactory
接口:这个接口用于创建新线程,可以通过实现这个接口来定义自己的线程工厂。RejectedExecutionHandler
接口:这个接口用于处理无法执行的任务,可以通过实现这个接口来定义自己的拒绝策略。
线程池的创建方式
Java线程池提供了以下几种常见的方法:
Executors.newFixedThreadPool(int)
:创建一个固定大小的线程池,该线程池可以创建指定数量的线程,用于执行提交的任务。Executors.newCachedThreadPool()
:创建一个可缓存的线程池,该线程池会缓存空闲的线程,以便重用。Executors.newSingleThreadExecutor()
:创建一个单线程化的线程池,该线程池只会创建一个线程,用于执行提交的任务。Executors.newScheduledThreadPool(int)
:创建一个定时线程池,该线程池可以定时执行任务,并可以按照指定的时间间隔调度任务。ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>)
:创建一个可配置的线程池,该线程池可以指定线程池大小、任务队列、线程工厂、拒绝策略等参数。
newFixedThreadPool
- FixedThreadPool的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中
- keepAliveTime为 0,也就是超出核心 线程数量以外的线程空余存活时间
- 阻塞队列是 LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE, 相当于没有上限
newCachedThreadPool
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程
- 没有核心线程,非核心线程无上限,完美诠释外包,用完六十秒后空闲就回收
ThreadPoolExecutor 的参数详解
参数名称 | 类型 | 描述 |
---|---|---|
corePoolSize |
int |
核心线程数,线程池中保持活动的最小线程数。 |
maximumPoolSize |
int |
最大线程数,线程池能够容纳的最大线程数。 |
keepAliveTime |
long |
非核心线程的存活时间,当线程数超过核心线程数时,多余的线程在终止之前等待新任务的最长时间。 |
TimeUnit |
java.util.concurrent.TimeUnit |
存活时间的单位,可以是NANOSECONDS 、MILLISECONDS 、SECONDS 、MINUTES 、HOURS 或DAYS 。 |
BlockingQueue<Runnable> |
java.util.concurrent.BlockingQueue<Runnable> |
用于存放任务的队列。当所有核心线程都在忙,且任务队列未满时,新任务会进入队列等待执行。当队列满了,且当前线程数小于最大线程数时,则创建新的线程执行任务。 |
ThreadFactory |
java.util.concurrent.ThreadFactory |
用于创建新线程的工厂,可以自定义线程的创建方式,如设置线程的名称、优先级等。 |
RejectedExecutionHandler |
java.util.concurrent.RejectedExecutionHandler |
拒绝策略处理器,当任务无法被执行时(如队列满了且线程数达到上限),会调用此策略的处理方法。默认情况下,如果处理器未被设置,则抛出RejectedExecutionException 异常。 |
如何使用线程池
示例1:创建线程池、添加任务、关闭线程池
Java线程池可以通过使用Executor框架提供的接口和类来创建和管理。其中,ThreadPoolExecutor类是一个常用的线程池实现,它支持自定义线程池的大小、线程存活时间、工作队列类型等参数,并且提供了丰富的控制和监控接口,方便开发人员进行配置和管理。
下面是一个简单的Java线程池的使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
int poolSize = 10;
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
// 提交任务到线程池
for (int i = 0; i < 100; i++) {
Runnable worker = new WorkerThread("" + i);
executor.execute(worker);
}
// 关闭线程池(平滑关闭)
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("所有线程运行完毕");
}
}
class WorkerThread implements Runnable {
private String command;
public WorkerThread(String command) {
this.command = command;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始处理:" + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " 结束处理:" + command);
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个示例中,我们首先使用Executors类的newFixedThreadPool方法创建了一个固定大小的线程池,大小为10。然后,我们使用executor的execute方法提交了100个任务到线程池中。每个任务都是一个实现了Runnable接口的WorkerThread对象。在WorkerThread的run方法中,我们模拟了一个耗时的操作,并通过System.out.println输出线程的名字和处理的任务。最后,我们使用executor的shutdown方法关闭线程池,并等待所有线程运行完毕后输出“所有线程运行完毕”。
示例2:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample1 {
public static void main(String[] args) {
// 创建一个单线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交任务到线程池
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("执行任务");
}
});
// 关闭线程池(平滑关闭)
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("所有线程运行完毕");
}
}
示例3 如何使用Java 线程池执行定时任务
要使用Java线程池执行定时任务,可以按照以下步骤进行:
-
创建一个线程池对象。可以使用
Executors.newScheduledThreadPool()
方法创建一个定时任务线程池。例如:ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
-
创建一个
Runnable
或Callable
的任务对象,该任务对象表示要执行的定时任务。例如,创建一个Runnable
的任务对象:Runnable task = new Runnable() { @Override public void run() { // 定时任务逻辑 } };
-
使用定时任务线程池的
schedule()
方法来安排任务的执行。该方法接受一个任务对象和一个表示延迟执行的时间参数。例如,将任务延迟1秒后执行:executor.schedule(task, 1, TimeUnit.SECONDS);
-
可选地,如果需要定时执行任务,可以使用
scheduleAtFixedRate()
方法或scheduleWithFixedDelay()
方法。例如,每隔5秒执行一次任务:executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
-
当不再需要执行定时任务时,需要调用线程池的
shutdown()
方法来关闭线程池。例如:executor.shutdown();
完整的示例代码如下所示:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Runnable task = new Runnable() {
@Override
public void run() {
// 定时任务逻辑
System.out.println("定时任务执行");
}
};
// 延迟1秒后执行任务
executor.schedule(task, 1, TimeUnit.SECONDS);
// 每隔5秒执行任务
executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
// 关闭线程池
executor.shutdown();
}
}
这样就可以使用Java线程池执行定时任务了。
线程池的实现原理分析
- ThreadPoolExecutor 是线程池的核心,提供了线程池的实现
- ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,并另外提供一些调度方法以支 持定时和周期任务
- Executers 是工具类,主要用来创建线程池对象
execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 新建一个线程执行任务 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 核心池满了,但任务队列未满,添加任务 int recheck = ctl.get(); // 任务添加成功,再次检查是否需要添加新的线程,因为可能有线程被销毁了 if (! isRunning(recheck) && remove(command)) reject(command); // 之前的线程被销毁,新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 核心池已满,队列已满,新建一个工作线程 else if (!addWorker(command, false)) reject(command);// 如果失败,说明线程池被关闭或者已经满了,拒绝任务 }
ctl
在线程池中,
ctl
贯穿在线程池的整个生命周期中
ctl
是一个AtomicInteger
,主要保存线程池的数量和状态,这里采用 高三位 保存运行状态,低29位保存线程数量
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //goto语句
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程处于非运行状态,表示使用了 shutdown 了,所以拒绝添加任务
// shutdown状态,不允许添加任务,但可以执行原先就在队列中的任务
// 且 rs 不等于 SHUTDOWN 且 firstTask 不等于空
// 且 workQueue 为空,直接返回 false(表示不可添加 work 状态)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);// 获得工作线程数
// 如果工作线程数大于默认容量或者大于线程最大数量,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// 通过cas增加,cas失败则会重试
break retry;
c = ctl.get(); // 获取ctl锁
if (runStateOf(c) != rs) // 线程的状态发生了变化,继续重试
continue retry;
}
}
// 正式创建worker线程
boolean workerStarted = false; // 工作线程是否启动的标志
boolean workerAdded = false; // 工作线程是否添加成功的标志
Worker w = null;
try {
w = new Worker(firstTask); // 构建一个worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 从worker对象中取得线程
mainLock.lock();// 避免并发
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || // 线程池是运行状态或者是SHUTDOWN并且firstTask=null
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 将新创建的 worker 添加到 workers 中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新线程池出现过的最大线程数
workerAdded = true; //表示工作线程创建成功了
}
} finally {
mainLock.unlock(); // 释放锁
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true; // 表示启动成功
}
}
} finally {
if (! workerStarted) // 添加失败,递减工作线程,因为最先开始的时候,创建了工作线程
addWorkerFailed(w);
}
return workerStarted;// 返回结果
}
Worker 类
每个
worker
,都是一条线程,同时里面包含了一个firstTask
,即初始化时要被首先执行的任务
最终执行任务的,是runWorker()
worker类继承了AQS
,实现了Runnable
接口,firstTask
用于保存传入的任务,thread
是在调用构造方法时通过ThreadFactory
来创建的 线程,是用来处理任务的线程
在调用构造方法时,需要传入任务,通过getThreadFactory().newThread(this);
来新建 一个线程
newThread
方法传入的参数是this
,因为Worker
本身继承了Runnable
接口, 也就是一个线程,所以一个Worker
对象在启动的时候会调用Worker
类中的run()
由于
ReentrantLock
是允许重入的,但是tryAcquire
是不允许重入的,所以Worker
继承了AQS
,使用AQS
来实现独占锁的功能如果正在执行任务,则不应该中断线程
如果该线程不是独占锁的状态,也就是空闲的状态,说明没有处理任务,这时可以对该线程终止
线程池执行shutdown()
的时候会调用interruptIdleWorkers()
中断空闲的线程
interruptIdleWorkers
会使用tryLock()
判断线程是否空闲之所以设置为不可重入,是因为我们不希望任务在调用像
setCorePoolSize
这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 真正执行任务的线程,由ThreadFactury构建. */
final Thread thread;
/** 需要执行的任务. */
Runnable firstTask;
/** 完成的任务数,用于任务池统计 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 初始状态,防止在调用 runWorker(),也就是真正执行 task前中断 thread
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 执行任务
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker
线程池中执行任务的 真正处理逻辑
- 如果
task
不为空,则开始执行task
- 如果
task
为空,则通过getTask()
再去取任务,并赋值给task
,如果取到的Runnable
不为空,则 执行该任务 - 执行完毕后,通过
while
循环继续getTask()
取任务 - 如果
getTask()
取到的任务依然是空,那么整个runWorker()
方法执行完毕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 表示当前的线程允许中断,这里会将 state 设置为 0
boolean completedAbruptly = true;
try {
// 如果 task 为 null,则通过 getTask() 取得任务
while (task != null || (task = getTask()) != null) {
w.lock();// 为了 shutdown() 不中断执行中的任务,需要加锁
// 线程池为 stop 时,不接受新的任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();// 执行 task 的run()方法,也就是实际传入的线程执行逻辑
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 执行完成后,将task设置为null,下一次通过 getTask() 获取新的任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// worker执行完毕后,将当前的worker从workers中删除
// 根据布尔值 allowCoreThreadTimeOut 决定是否补充新的worker进workers
processWorkerExit(w, completedAbruptly);
}
}
getTask
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 对线程池状态的判断
// workQueue 为空并且是 shutdown 状态,则返回 null,结束 wroker 的循环
// 状态为 STOP,则是调用了shutdownNow,所以直接停止 worker 的循环
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; // 返回 null,则当前的 worker 执行结束
}
int wc = workerCountOf(c);
// timed 用于判断是否需要做超时控制
// allowCoreThreadTimeOut 默认是 true,也就是核心线程不允许超时
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSi
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据 timed 判断
// true 通过 poll 方法进行超时控制
// false 通过 task 获取队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) // 任务不为空,则交给worker处理
return r;
timedOut = true;// 如果 r==null,则设置为 timeOut,下一次自旋会回收
} catch (InterruptedException retry) {
timedOut = false;// 重试
}
}
}
拒绝策略
可以通过实现
RejectedExecutionHandler
接口,自定义拒绝策略
类 | 说明 |
---|---|
AbortPolicy | 直接抛出异常,默认策略; |
CallerRunsPolicy | 调用者所在的线程来执行任务 |
DiscardOldestPolicy | 丢弃阻塞队列中靠最前的任务,并执行当前任务 |
DiscardPolicy | 直接丢弃任务 |
如何合理的配置线程池大小
类型 | 说明 |
---|---|
IO 密集型 | ((线程池设定的线程等待时间+线程CPU时间)/线程CPU时间)*CPU数目 |
CPU 密集型 | cpu 核心数+1 |
线程池中的线程初始化
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartCoreThread();// 初始化一个核心线程
tpe.prestartAllCoreThreads(); // 初始化所有核心线程
线程池的关闭
方法名称 | 说明 |
---|---|
shutdown() | 不会立刻关闭,会等到任务队列中的任务全部执行完后终止 |
shutdownNow() | 立刻终止线程池,并且尝试打断正在执行的任务,清空缓存队列,返回为执行的任务 |
线程池容量的动态调整
方法名称 | 说明 |
---|---|
setCorePoolSize() | 设置核心池大小 |
setMaximumPoolSize() | 设置线 程池最大能创建的线程数目大小 |
任务缓存队列及排队策略
类型 | 说明 |
---|---|
ArrayBlockingQueue | 基于数组的先进先出队列,此队列创建时必须指定大小 |
LinkedBlockingQueue | 基于链表的先进先出队列,如果创建时没有指定此队列大小,则默 认为 Integer.MAX_VALUE |
SynchronousQueue | 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务 |
线程池排队策略设置
在Java中,线程池的任务缓存队列一般是由线程池自己管理的,我们不需要手动去设置。具体使用哪个队列是由线程池的内部实现来决定的。
当我们提交一个任务给线程池时,线程池会根据其内部策略来决定是立即执行这个任务,还是将它放到任务队列中等待执行。具体的策略可能因线程池的实现而异,但通常会考虑线程池的大小(即当前有多少可用的线程)以及任务队列的大小等因素。
Java线程池的排队策略可以在创建线程池时通过ThreadPoolExecutor
的构造函数进行设置。ThreadPoolExecutor
有一个构造函数接收一个BlockingQueue
对象作为参数,这个对象用于存储等待执行的任务。
Java线程池的排队策略主要有以下几种:
- 直接提交策略(
DiscreteThreadScheduler
): 任务被提交后立即执行,适用于对延迟要求较高的场景。 - 优先队列策略(
PriorityThreadScheduler
): 任务按优先级排队,高优先级任务先于低优先级任务执行,适用于对优先级要求较高的场景。 - 轮询策略(
RoundRobinThreadScheduler
): 任务按轮询方式执行,适用于需要均衡负载的场景。 - 限时策略(
TimedThreadScheduler
): 任务在等待一定时间后开始执行,适用于需要在规定时间内完成任务或者需要避免长时间等待的场景。
可以通过以下方式创建具有特定排队策略的线程池:
一些节本的策略设置
// 使用直接提交策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new DiscreteThreadScheduler());
// 使用优先队列策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new PriorityThreadScheduler());
// 使用轮询策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new RoundRobinThreadScheduler());
// 使用限时策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new TimedThreadScheduler());
需要注意的是,以上代码中的corePoolSize
、maximumPoolSize
、keepAliveTime
等参数也需要根据实际需求进行设置。
使用 ArrayBlockingQueue 策略
对于ThreadPoolExecutor
,我们可以通过构造函数来设置一些参数,如任务队列的容量,但具体的任务存储和管理是由线程池自己来负责的。
例如:
// 创建一个具有指定核心线程数、最大线程数、空闲线程存活时间的线程池,
// 并使用一个具有指定容量的任务队列来存储待执行的任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueCapacity)
);
这里使用的ArrayBlockingQueue
是一个阻塞队列,可以作为任务队列。当我们向队列中添加任务时,如果队列已满,那么会阻塞直到队列有空位;当我们从队列中获取任务时,如果队列为空,那么会阻塞直到队列中有新的任务。这样能保证线程池中的任务总是按顺序执行,不会因为同时有大量的任务提交而被“打乱”。
总的来说,Java线程池会自动管理任务队列,我们通常不需要手动去设置。如果你需要使用自定义的任务队列,那么需要确保这个队列能正确地处理并发访问和阻塞情况。