如果不能将并发与容量控制都封装在缓冲区中,就只能由消费者与生产者完成。最简单的方案是使用朴素的
wait && notify
机制。
package com.github.xuchengen.concurrent.impl; import com.github.xuchengen.concurrent.AbsConsumer; import com.github.xuchengen.concurrent.AbsProducer; import com.github.xuchengen.concurrent.Model; import com.github.xuchengen.concurrent.Task; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; /** * 基于对象锁wait和notify实现 * 作者:徐承恩 * 邮箱:xuchengen@gmail.com * 日期:2019/12/11 */ public class WaitNotifyModel implements Model { private final Object BUFFER_LOCK = new Object(); private final Queue<Task> buffer = new LinkedList<>(); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public WaitNotifyModel(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbsConsumer { @Override public void consume() throws InterruptedException { synchronized (BUFFER_LOCK) { while (buffer.size() == 0) { // 当缓冲中元素为空则阻塞该线程与此同时会唤醒生产者线程进行生产 BUFFER_LOCK.wait(); } Task task = buffer.poll(); assert task != null; // 固定时间范围的消费,模拟相对稳定的服务器处理过程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); // 消费后唤醒生产者线程进行生产 BUFFER_LOCK.notifyAll(); } } } private class ProducerImpl extends AbsProducer { @Override public void produce() throws InterruptedException { // 不定期生产,模拟随机的用户请求 Thread.sleep((long) (Math.random() * 1000)); synchronized (BUFFER_LOCK) { while (buffer.size() == cap) { // 当缓冲中元素已满则阻塞该线程与此同时会唤醒消费者线程进行消费 BUFFER_LOCK.wait(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); System.out.println("produce: " + task.no); // 生产后唤醒消费者线程进行消费 BUFFER_LOCK.notifyAll(); } } } public static void main(String[] args) { Model model = new WaitNotifyModel(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } }