自定义线程池 02 - 初步实现线程池

完整代码已上传gitee ,地址 :朱元杰的开源仓库 -- ThreadPool核心源码仿写

完整文章栏目地址在:Water的学习分享 - ThreadPool仿写

上一步我们已经实现阻塞队列(任务队列),接下来 初步实现 线程池

将线程池类定义为 MyThreadPool

线程池属性

线程池中需要有如下属性:

  • 任务队列 private MyBlockingQueue<Runnable> taskQueue MyBlockingQueue在上一步已经实现,用于存放任务
  • 线程集合 private HashSet<Worker> workers = new HashSet<>() 线程池中有多个线程,需要用集合存储,Worker是我们自己封装的线程类,后面会讲到
  • 核心线程数 private int coreSize 因为只进行简易仿写,此处假设最大线程数等于核心线程数
  • 超时时间 private long timeoutprivate TimeUnit timeUnit ,此处是线程获取任务的超时时间,添加任务的超时时间由生产者线程自己指定

Worker线程

在线程池的属性中,我们使用到的Worker线程类,是通过继承Thread类实现的,代码如下:

    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }

此处,如果有任务则直接执行,执行完去任务队列调用 take(不带超时间)或 poll(带超时时间)去获取新任务,在 poll 情况下,如果超时时间到,那么将当前线程从线程集合中移除,注意该过程需要加synchronized同步执行

线程池构造方法

    public MyThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new MyBlockingQueue<>(queueCapcity);
    }

在构造方法中指定了 核心线程数超时时间外,还指定了 任务队列的长度

执行任务方法

调用执行任务方法来往线程池中添加任务

  • 若此时存在的线程数小于coreSize,直接创建 worker 对象执行,并将该 worker 对象 放入到 线程集合 workers 中
  • 若此时存在的线程数等于coreSize(这个时候每个线程都在执行着其他任务),将任务放到任务队列中
    // 执行任务
    public void execute(Runnable task) {
        // 
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.info("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                taskQueue.put(task);
            }
        }
    }
end

评论

新增邮件回复功能,回复将会通过邮件形式提醒,请填写有效的邮件!