自定义线程池 01 - 阻塞队列

完整代码已上传gitee ,地址 :朱元杰的开源仓库 -- ThreadPool核心源码仿写

完整文章栏目地址在:Water的学习分享 - ThreadPool仿写

接下来将手动仿写一个线程池,第一步先仿写 阻塞队列

为什么需要阻塞队列 - 因为不能为每个任务都创建一个线程,当任务数量超过可用线程的数量,需要将任务放在阻塞队列中

阻塞队列属性

阻塞队列我们定义为一个类 MyBlockingQueue ,要有如下几个属性

  • 任务队列 private Deque<T> queue = new ArrayDeque<>(); 使用 ArrayDeque 因为性能好于 LinkList
  • private ReentrantLock lock = new ReentrantLock(); 防止多个线程同时获取头部任务,也防止多个线程同时添加任务而发生线程安全问题
  • 生产者(main)条件变量 private Condition fullWaitSet = lock.newCondition(); 阻塞队列有容量限制,当任务过多,生产者线程需阻塞等待
  • 消费者(线程池的线程)条件变量 private Condition emptyWaitSet = lock.newCondition(); 当阻塞队列为空,消费者也需要阻塞等待
  • 容量 private int capcity; 阻塞队列的容量

任务添加逻辑

    // 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

上锁,while判断队列是否已满,满了则让当前 添加线程 进入 fullWaitSet.await() ,不满则执行 queue.addLast(task) 添加任务到队列,在此之前 线程池中的 可能有 消费线程 因为任务队列没有任务而进入 emptyWaitSet.await() ,因此添加完任务后有必要调用 emptyWaitSet.signal() 去唤他们

任务获取逻辑

    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

与任务添加的逻辑类似,不在赘述

阻塞优化 - 添加超时时间

只需将 await 方法替换为 awaitNanos 方法,就可以实现 带超时的阻塞添加和获取

带超时时间的阻塞获取如下

    // 带超时阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

给方法传入自定义的超时时间,toNanos 方法将时间统一转化为纳秒

emptyWaitSet.awaitNanos(nanos) 方法的返回值是 定义的等待时间nanos - 已等待的时间,因为存在虚假唤醒的可能

虚假唤醒 指 被唤醒的原因不是因为有新任务添加到阻塞队列中,也不是因为超时时间到,而是其他原因,因此唤醒后 阻塞队列中仍然可能为空,此时就要让他继续等待,不过只需等完剩余的超时时间,不能从头开始等待

如果超时时间耗尽,还没有新任务,就返回null,后面消费者线程获取null,就知道暂时没有任务需要执行,就会结束线程

带超时时间的阻塞添加如下

    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

因为超时了就会失败,因此函数的返回值类型不能是void,应该设置为boolean,超时获取失败了就返回 false,其他逻辑与 超时阻塞获取 一致

end

评论

新增邮件回复功能,回复将会通过邮件形式提醒,请填写有效的邮件!