BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。而BlockingQueue的性质天生满足这个要求。
package com.github.xuchengen.concurrent.impl; import com.github.xuchengen.concurrent.AbsConsumer; import com.github.xuchengen.concurrent.AbsProducer; import com.github.xuchengen.concurrent.Model; import com.github.xuchengen.concurrent.Task; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 基于阻塞队列的缓冲实现 * 作者:徐承恩 * 邮箱:xuchengen@gmail.com * 日期:2019/12/11 */ public class BlockingQueueModel implements Model { private final BlockingQueue<Task> queue; private final AtomicInteger increTaskNo = new AtomicInteger(); public BlockingQueueModel(int cap) { this.queue = new LinkedBlockingQueue<>(cap); } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbsConsumer { @Override public void consume() throws InterruptedException { Task task = queue.take(); // 固定时间范围的消费,模拟相对稳定的服务器处理过程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); } } private class ProducerImpl extends AbsProducer { @Override public void produce() throws InterruptedException { // 不定期生产,模拟随机的用户请求 Thread.sleep((long) (Math.random() * 1000)); Task task = new Task(increTaskNo.getAndIncrement()); System.out.println("produce: " + task.no); queue.put(task); } } public static void main(String[] args) { Model model = new BlockingQueueModel(3); //2个消费者 for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } //5个生产者 for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } }
由于操作“出队/入队+日志输出”不是原子的,所以上述日志的绝对顺序与实际的出队/入队顺序有出入,但对于同一个任务号task.no
,其consume日志一定出现在其produce日志之后,即:同一任务的消费行为一定发生在生产行为之后。缓冲区的容量留给读者验证。符合两个验证条件。
BlockingQueue写法的核心只有两行代码,并发和容量控制都封装在了BlockingQueue中,正确性由BlockingQueue保证。面试中首选该写法,自然美观简单。