自定义线程池 03 - 拒绝策略

完整代码已上传gitee ,地址 :朱元杰的开源仓库 -- ThreadPool核心源码仿写

完整文章栏目地址在:Water的学习分享 - ThreadPool仿写

前文我们已经初步实现了 阻塞队列(任务队列)、线程池 ,完成了基本功能,接下来完善任务拒绝策略

任务队列的容量有限,当任务过多时,我们可以有多种选择

先看往任务放队列的代码:

MyThreadPool 类的

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.info("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                taskQueue.put(task);
            }
        }
    }

在此我们直接调用 taskQueue.put(task); 去往任务队列中放任务,当队列已满时,会死等,这会导致太多任务生产者线程阻塞,在有些场景下是不允许的,我们需要更多的选择,这个选择就叫 “拒绝策略”

比如,我们可以:

  1. 死等
  2. 带超时等待
  3. 让生产者放弃任务执行
  4. 让生产者抛出异常
  5. 让生产者自己执行任务
  6. ... ...

还有很多种拒绝策略,如果我们通过 if/else 写死在线程池中,会导致可扩展性太差 因此到底使用何种拒绝策略,我们应该下放给 用户(任务生产者)自己选择,这就是 策略 设计模式

策略模式 就是把具体的操作抽象成一个接口,具体的实现由调用者自己定义

我们定义出这个接口 RejectPolicy

// 拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(MyBlockingQueue<T> queue, T task);
}

并往线程池类 MyThreadPool 中添加属性 private RejectPolicy<Runnable> rejectPolicy;

修改 MyThreadPool 的构造方法如下:

    public MyThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new MyBlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

那么用户定义线程池时,就可以使用 Lambda表达式 实现一个 rejectPolicy 对象,实现 rejectPolicy对象 就是实现 RejectPolicy 的 reject 方法,该方法中有两个参数 一个任务队列 一个要放入的任务,这个方法后面会交给 当前线程池 的 任务队列去执行,而传入的任务队列,就是自己,不懂别急,接着往下看

我们需要额外给 MyBlockingQueue 定义一个添加任务的方法,这个任务的逻辑应该是,任务队列不满时,直接放入任务队列,任务队列满时,调用 用户自定义的拒绝策略,代码实现如下:

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if (queue.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else { // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

可以看到当 queue.size() == capcity 时(即队列已满),直接调用 rejectPolicy.reject(this, task); 其中传入了 this 和 task ,this 就是 MyBlockingQueue 对象本身,用户在实现 RejectPolicy 的 reject 时,拿到的 queue 就是现在使用的 任务队列,用户就可以根据自己的需求去调用 任务队列 的方法,也可以不使用 (比如 直接放弃任务执行的拒绝策略),具体的使用方法参考下一篇博客 地址:自定义线程池 04 - 使用演示

MyThreadPool 类在执行任务的方法 execute 中,不在直接调用 taskQueue.put(task); 方法 ,调用上面的方法,修改如下:

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.info("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
end

评论

新增邮件回复功能,回复将会通过邮件形式提醒,请填写有效的邮件!