Java生产者消费者模型实践二

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。而BlockingQueue的性质天生满足这个要求。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于阻塞队列的缓冲实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class BlockingQueueModel implements Model {

    private final BlockingQueue<Task> queue;

    private final AtomicInteger increTaskNo = new AtomicInteger();

    public BlockingQueueModel(int cap) {
        this.queue = new LinkedBlockingQueue<>(cap);
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {

        @Override
        public void consume() throws InterruptedException {
            Task task = queue.take();
            // 固定时间范围的消费,模拟相对稳定的服务器处理过程
            Thread.sleep(500 + (long) (Math.random() * 500));
            System.out.println("consume: " + task.no);
        }
    }

    private class ProducerImpl extends AbsProducer {

        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            Task task = new Task(increTaskNo.getAndIncrement());
            System.out.println("produce: " + task.no);
            queue.put(task);
        }
    }

    public static void main(String[] args) {

        Model model = new BlockingQueueModel(3);

        //2个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }

        //5个生产者
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

由于操作“出队/入队+日志输出”不是原子的,所以上述日志的绝对顺序与实际的出队/入队顺序有出入,但对于同一个任务号task.no,其consume日志一定出现在其produce日志之后,即:同一任务的消费行为一定发生在生产行为之后。缓冲区的容量留给读者验证。符合两个验证条件。

BlockingQueue写法的核心只有两行代码,并发和容量控制都封装在了BlockingQueue中,正确性由BlockingQueue保证。面试中首选该写法,自然美观简单。