阻塞队列

结构:

Collection 接口
	|
Queue 接口
	|
BlockingQueue接口
	|
实现类
ArrayBlockingQueue  底层数组
LinkedBlockingQueue  底层链表:默认长度21亿
SynchronousQueue  里面只能存放一个

阻塞队列常用API

抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()//

示例代码,以ArrayBlockingQueue为例

public class Demo01 {
  // add和remove会抛出异常
  @Test
  public void test01(){
    BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    queue.add("a");
    queue.add("b");
    queue.add("c");

    queue.remove();
    queue.remove();
    queue.remove();
    queue.remove();
  }

  // put 会阻塞
  @Test
  public void test02() throws InterruptedException {
    BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    queue.put("a");
    queue.put("b");
    queue.put("c");
    queue.take();
    queue.put("d");

    System.out.println(queue);
  }
}

线程池

核心类

Executor 接口
	|
ExecutorService接口
	|
实现类ThreadPoolExecutor

常用线程池

newFixedThreadPool:使用指定大小的线程池

newSingleThreadExecutor:只有一个

newCachedThreadPool:一次处理N个请求的线程池

示范代码

public class ThreadPoolDemo {
  public static void main(String[] args) throws InterruptedException {
    // 指定线程数
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    // 只有一个
    ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
    // 自己能忙过来就自己干,忙不过来就请帮手
    ExecutorService threadPool3 = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
      threadPool3.execute(new Runnable() {
        @Override
        public void run() {
          System.out.println(Thread.currentThread().getName()+"正在执行任务");
        }
      });
      Thread.sleep(500);
    }
  }
}

重要参数

这三个线程池,

  1. 底层都是用的ThreadPoolExecutor
  2. 并且都是7个参数
  3. 并且调动方法都一样
  4. 使用的LinkedBlockingQueue 底层链表,默认长度21亿

参数包括:

  1. int corePoolSize
  2. int maxiumPoolSize
  3. long keepAliveTime
  4. TimeUnit unit
  5. BlockingQueue<Runnable> workQueue
  6. ThreadFactory threadFactory
  7. RejectedExecutionHandler handler

corePoolSize:线程池核心池大小,也就是常驻的核心线程数

maxiumPoolSize:最大池大小,也就是最大线程数

workQueue:放任务的,候客区,用于排队等候队列。

keepAliveTime:多于的空闲线程的存活时间,当前线程池数量超过corePoolSize时,当高峰期过去,空闲时间达到keepAliveTime,多余空闲线程会被销毁,相当于从maxiumPoolSize缩容回corePoolSize,减少资源消耗

unit:keepAliveTime的单位

RejectedExecutionHandler :有四种拒绝策略

  1. AbortPolicy(默认):直接抛出RejectedExecutionException异常组织系统正常运行
  2. CallerRunsPolicy:调用者运行一种调用机制,该策略不会抛出异常,而是将某些任务退回给调用者,从而降低流量
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入到队列中再尝试提交任务
  4. DiscardPolicy:直接丢弃任务,不予处理,如果允许任务丢失,这是最好的解决方案。

实际开发中使用谁?都不用,自己的

工作的线程的数量是怎么算的?经验

  1. CPU型 这些多线程程序在进行大量的运算 线程数=CPU核数+1
  2. IO型 没有大量的计算 CPU的2倍或3倍

示例代码

public class ThreadPoolDemo {
  public static void main(String[] args) throws InterruptedException {
    // 指定线程数
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    // 只有一个
    ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
    // 自己能忙过来就自己干,忙不过来就请帮手
    ExecutorService threadPool3 = Executors.newCachedThreadPool();

    // 工作中自定义线程池
    ExecutorService threadPoolExecutor = new ThreadPoolExecutor(2,4,1L,
                                                                TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>(3),
                                                                Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 7; i++) {
      threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
          System.out.println(Thread.currentThread().getName()+"正在执行任务");
        }
      });
    }
  }
}

手写线程池

public class FixedThreadPool {

  // 阻塞队列
  private BlockingQueue<Runnable> taskQueue;
  // 集合
  private List<Worker> works;

  public FixedThreadPool(int poolSize,int taskNum){
    taskQueue = new LinkedBlockingDeque<>(taskNum);
    works = new ArrayList<>();

    for (int i = 0; i < poolSize; i++) {
      Worker w = new Worker(this);
      w.start();
      works.add(w);
    }
  }

  public boolean execute(Runnable r){
    return taskQueue.offer(r);
  }

  private static class Worker extends Thread{

    FixedThreadPool pool = null;
    public Worker(FixedThreadPool pool){
      this.pool = pool;
    }

    public void run(){
      while(true){
        Runnable task = null;
        try {
          task = pool.taskQueue.take();
          if(task!=null){
            task.run();
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
}
public class ThreadPoolDemo {
  public static void main(String[] args) {

    // 自己手写线程池
    FixedThreadPool threadPool4 = new FixedThreadPool(5,8);
    for (int i = 0; i < 10; i++) {
      threadPool4.execute(new Runnable() {
        @Override
        public void run() {
          System.out.println(Thread.currentThread().getName()+"正在执行任务");
        }
      });
    }
  }
}

线程池执行任务,调用execute方法,任务提交到阻塞队列中,taskQueue.offer(r),提交之前new了poolSize,准备好线程数,w.start();准备好了5个线程,这些线程进入run方法的while循环中,从阻塞队列里拿task,拿出来的就是提交的task

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐