java提高之线程池
阻塞队列结构:Collection 接口|Queue 接口|BlockingQueue接口|实现类ArrayBlockingQueue底层数组LinkedBlockingQueue底层链表:默认长度21亿SynchronousQueue里面只能存放一个阻塞队列常用API抛出异常特殊值阻塞超时插入add(e)offer(e)...
阻塞队列
结构:
Collection 接口
|
Queue 接口
|
BlockingQueue接口
|
实现类
ArrayBlockingQueue 底层数组
LinkedBlockingQueue 底层链表:默认长度21亿
SynchronousQueue 里面只能存放一个
阻塞队列常用API
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | / | / |
示例代码,以ArrayBlockingQueue为例
public class Demo01 {
// add和remove会抛出异常
@Test
public void test01(){
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
queue.add("a");
queue.add("b");
queue.add("c");
queue.remove();
queue.remove();
queue.remove();
queue.remove();
}
// put 会阻塞
@Test
public void test02() throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
queue.put("a");
queue.put("b");
queue.put("c");
queue.take();
queue.put("d");
System.out.println(queue);
}
}
线程池
核心类
Executor 接口
|
ExecutorService接口
|
实现类ThreadPoolExecutor
常用线程池
newFixedThreadPool:使用指定大小的线程池
newSingleThreadExecutor:只有一个
newCachedThreadPool:一次处理N个请求的线程池
示范代码
public class ThreadPoolDemo {
public static void main(String[] args) throws InterruptedException {
// 指定线程数
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 只有一个
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
// 自己能忙过来就自己干,忙不过来就请帮手
ExecutorService threadPool3 = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
threadPool3.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在执行任务");
}
});
Thread.sleep(500);
}
}
}
重要参数
这三个线程池,
- 底层都是用的ThreadPoolExecutor
- 并且都是7个参数
- 并且调动方法都一样
- 使用的LinkedBlockingQueue 底层链表,默认长度21亿
参数包括:
int corePoolSize
int maxiumPoolSize
long keepAliveTime
TimeUnit unit
BlockingQueue<Runnable> workQueue
ThreadFactory threadFactory
RejectedExecutionHandler handler
corePoolSize:线程池核心池大小,也就是常驻的核心线程数
maxiumPoolSize:最大池大小,也就是最大线程数
workQueue:放任务的,候客区,用于排队等候队列。
keepAliveTime:多于的空闲线程的存活时间,当前线程池数量超过corePoolSize时,当高峰期过去,空闲时间达到keepAliveTime,多余空闲线程会被销毁,相当于从maxiumPoolSize缩容回corePoolSize,减少资源消耗
unit:keepAliveTime的单位
RejectedExecutionHandler :有四种拒绝策略
- AbortPolicy(默认):直接抛出RejectedExecutionException异常组织系统正常运行
- CallerRunsPolicy:调用者运行一种调用机制,该策略不会抛出异常,而是将某些任务退回给调用者,从而降低流量
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入到队列中再尝试提交任务
- DiscardPolicy:直接丢弃任务,不予处理,如果允许任务丢失,这是最好的解决方案。
实际开发中使用谁?都不用,自己的
工作的线程的数量是怎么算的?经验
- CPU型 这些多线程程序在进行大量的运算 线程数=CPU核数+1
- IO型 没有大量的计算 CPU的2倍或3倍
示例代码
public class ThreadPoolDemo {
public static void main(String[] args) throws InterruptedException {
// 指定线程数
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 只有一个
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
// 自己能忙过来就自己干,忙不过来就请帮手
ExecutorService threadPool3 = Executors.newCachedThreadPool();
// 工作中自定义线程池
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(2,4,1L,
TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 7; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在执行任务");
}
});
}
}
}
手写线程池
public class FixedThreadPool {
// 阻塞队列
private BlockingQueue<Runnable> taskQueue;
// 集合
private List<Worker> works;
public FixedThreadPool(int poolSize,int taskNum){
taskQueue = new LinkedBlockingDeque<>(taskNum);
works = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
Worker w = new Worker(this);
w.start();
works.add(w);
}
}
public boolean execute(Runnable r){
return taskQueue.offer(r);
}
private static class Worker extends Thread{
FixedThreadPool pool = null;
public Worker(FixedThreadPool pool){
this.pool = pool;
}
public void run(){
while(true){
Runnable task = null;
try {
task = pool.taskQueue.take();
if(task!=null){
task.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public class ThreadPoolDemo {
public static void main(String[] args) {
// 自己手写线程池
FixedThreadPool threadPool4 = new FixedThreadPool(5,8);
for (int i = 0; i < 10; i++) {
threadPool4.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在执行任务");
}
});
}
}
}
线程池执行任务,调用execute方法,任务提交到阻塞队列中,taskQueue.offer(r)
,提交之前new了poolSize,准备好线程数,w.start();准备好了5个线程,这些线程进入run方法的while循环中,从阻塞队列里拿task,拿出来的就是提交的task
更多推荐
所有评论(0)