Java生产者消费者模型实践一

考查Java的并发编程时,手写“生产者-消费者模型”是一个经典问题。有如下几个考点:

  • 对Java并发模型的理解
  • 对Java并发编程接口的熟练程度
  • bug free
  • coding style

JDK版本:oracle java 1.8.0_102

本文主要归纳了4种写法,阅读后,最好在白板上练习几遍,检查自己是否掌握。这4种写法或者编程接口不同,或者并发粒度不同,但本质是相同的——都是在使用或实现BlockingQueue。

生产者-消费者模型

网上有很多生产者-消费者模型的定义和实现。本文研究最常用的有界生产者-消费者模型,简单概括如下:

  • 生产者持续生产,直到缓冲区满,阻塞;缓冲区不满后,继续生产
  • 消费者持续消费,直到缓冲区空,阻塞;缓冲区不空后,继续消费
  • 生产者可以有多个,消费者也可以有多个

可通过如下条件验证模型实现的正确性:

  • 同一产品的消费行为一定发生在生产行为之后
  • 任意时刻,缓冲区大小不小于0,不大于限制容量

该模型的应用和变种非常多,不赘述。

准备

面试时可语言说明以下准备代码。关键部分需要实现,如AbsConsumer。

下面会涉及多种生产者-消费者模型的实现,可以先抽象出关键的接口,并实现一些抽象类:

package com.github.xuchengen.concurrent;

/**
 * 生产者接口
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public interface Producer {

    /**
     * 生产者负责生产
     *
     * @throws InterruptedException 线程意外终止异常
     */
    void produce() throws InterruptedException;

}
package com.github.xuchengen.concurrent;

/**
 * 消费者接口
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public interface Consumer {

    /**
     * 消费者负责消费
     *
     * @throws InterruptedException 线程意外终止异常
     */
    void consume() throws InterruptedException;

}
package com.github.xuchengen.concurrent;

/**
 * 抽象生产者
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public abstract class AbsProducer implements Producer, Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }
}
package com.github.xuchengen.concurrent;

/**
 * 抽象生产者
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public abstract class AbsConsumer implements Consumer, Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }

}

不同的模型实现中,生产者、消费者的具体实现也不同,所以需要为模型定义抽象工厂方法:

package com.github.xuchengen.concurrent;

/**
 * 模型
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public interface Model {

    /**
     * 实例化一个消费者
     *
     * @return Runnable
     */
    Runnable newRunnableConsumer();

    /**
     * 实例化一个生产者
     *
     * @return Runnable
     */
    Runnable newRunnableProducer();

}

我们将Task作为生产和消费的单位:

package com.github.xuchengen.concurrent;

/**
 * 任务
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class Task {

    /**
     * 任务号
     */
    public int no;

    public Task(int no) {
        this.no = no;
    }
}

如果需求还不明确(这符合大部分工程工作的实际情况),建议边实现边抽象,不要“面向未来编程”