多线程等待问题

问题

在工作中遇到一个函数执行周期非常长,可以采用多线程来解决,优化处理速度

问题:需要判断多个服务是否在线,返回服务在线状态

解决

刚开始想到的是采用多线程来解决此问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public List<Data> SetStatus(List<Data> datas) {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

for (Data data : datas){
// 创建线程
Runnable runnable = new Runnable() {
@Override
public void run() {
// 执行需要耗时的函数
setStatus(data)
}
};
// 执行
executorService.execute(runnable);
}
// 关闭线程池
executorService.shutdown();
return datas;
}

但是上述方法问题存在于,线程池中的函数还没有全部执行完毕就返回结果了,导致返回的数据中有许多数据在线状态还没有检测完成

随后想到需要等线程池中的所有线程执行完毕之后才进行返回

解决方法:使用CountDownLatch计数,计数个数是我们需要处理的数据个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public List<Data> SetStatus(List<Data> datas) {
// 创建计数器
CountDownLatch countDownLatch = new CountDownLatch(datas.size());
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

for (Data data : datas){
// 创建线程
Runnable runnable = new Runnable() {
@Override
public void run() {
// 执行需要耗时的函数
setStatus(data);
// 在线程执行完成之后进行计数减一操作
countDownLatch.countDown();
}
};
// 执行
executorService.execute(runnable);
}
// 等待计数器值为零的时候在进行下面代码
countDownLatch.await()
// 关闭线程池
executorService.shutdown();
return datas;
}

上面方法二完美的解决了无法等待线程池全部完成之后在进行后面操作的问题

但是最后实行的时候会有问题,因为需要给每一个服务来判断该服务是否在线,需要去访问对应服务,这导致即使使用线程池,若服务不通依旧会很慢,所以最后解决办法是在上述方法的基础上采用两次访问接口的操作,第一次访问返回服务主体信息,并不会进行服务在线状态的判断,第二次访问是访问服务在线状态。这样不会导致前台页面出现长时间空白。

创建线程池的方式

转载:线程池的创建方式有几种

线程池的创建方式总共包含以下 7 种(其中 6 种是通过 Executors 创建的,1 种是通过ThreadPoolExecutor 创建的)

Executors

newFixedThreadPool

创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void fixedThreadPool() {
// 创建 2 个数据级的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);

// 创建任务
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
}
};

// 线程池执行任务(一次添加 4 个任务)
// 执行任务的方法有两种:submit 和 execute
threadPool.submit(runnable); // 执行方式 1:submit
threadPool.execute(runnable); // 执行方式 2:execute
threadPool.execute(runnable);
threadPool.execute(runnable);
}

CachedThreadPool

创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void cachedThreadPool() {
// 创建线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
// 执行任务
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
});
}
}

SingleThreadExecutor

创建单个线程数的线程池,它可以保证先进先出的执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void singleThreadExecutor() {
// 创建线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + ":任务被执行");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
});
}
}

ScheduledThreadPool

创建一个可以执行延迟任务的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void scheduledThreadPool() {
// 创建线程池
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
// 添加定时执行任务(1s 后执行)
System.out.println("添加任务,时间:" + new Date());
threadPool.schedule(() -> {
System.out.println("任务被执行,时间:" + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}, 1, TimeUnit.SECONDS); // 任务推迟1s执行
}

SingleThreadScheduledExecutor

创建一个单线程的可以执行延迟任务的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void SingleThreadScheduledExecutor() {
// 创建线程池
ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
// 添加定时执行任务(2s 后执行)
System.out.println("添加任务,时间:" + new Date());
threadPool.schedule(() -> {
System.out.println("任务被执行,时间:" + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}, 2, TimeUnit.SECONDS);
}

newWorkStealingPool

创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void workStealingPool() {
// 创建线程池
ExecutorService threadPool = Executors.newWorkStealingPool();
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
});
}
// 确保任务执行完成
while (!threadPool.isTerminated()) {
}
}

ThreadPoolExecutor

最原始的创建线程池的方式,它包含了 7 个参数可供设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void myThreadPoolExecutor() {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

七个参数分别是:

1.corePoolSize

核心线程数,线程池中始终存活的线程数。

2.maximumPoolSize

最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数。

3.keepAliveTime

最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程。

4.unit:

单位是和参数 3 存活时间配合使用的,合在一起用于设定线程的存活时间 ,参数 keepAliveTime 的时间单位有以下 7 种可选:

TimeUnit.DAYS:天

TimeUnit.HOURS:小时

TimeUnit.MINUTES:分

TimeUnit.SECONDS:秒

TimeUnit.MILLISECONDS:毫秒

TimeUnit.MICROSECONDS:微妙

TimeUnit.NANOSECONDS:纳秒

5.workQueue

一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全,它包含以下 7 种类型:

ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。

PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

较常用的是 LinkedBlockingQueue 和 Synchronous,线程池的排队策略与 BlockingQueue有关。

6.threadFactory

线程工厂,主要用来创建线程,默认为正常优先级、非守护线程

7.handler

拒绝策略,拒绝处理任务时的策略,系统提供了 4 种可选:

AbortPolicy:拒绝并抛出异常。

CallerRunsPolicy:使用当前调用的线程来执行此任务。

DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。

DiscardPolicy:忽略并抛弃当前任务。

默认策略为 AbortPolicy。

线程池的执行流程

  • 当线程数小于核心线程数时,创建线程。
  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  • 当线程数大于等于核心线程数,且任务队列已满若线程数小于最大线程数,创建线程若线程数等于最大线程数,抛出异常,拒绝任务

线程池的7种创建方式,强烈推荐你用它

等待线程池执行完毕方法

CountDownLatch

计数器方法:上面以写过

Future.get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public List<Data> SetStatus(List<Data> datas) {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future> futures = new ArrayList<>();
for (Data data : datas){
// 创建线程
Runnable runnable = new Runnable() {
@Override
public void run() {
// 执行需要耗时的函数
setStatus(data);
}
};
// 执行
Future<Data> submit = executorService.submit(runnable);
futures.add(submit);
}

for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 关闭线程池
executorService.shutdown();
return datas;
}