回顾前面做一些实验,你会发现,实现一的并发性能高于实现二、三。暂且不关心BlockingQueue的具体实现,来分析看如何优化实现三(与实现二的思路相同,性能相当)的性能。
分析实现三的瓶颈
最好的查证方法是记录方法执行时间,这样可以直接定位到真正的瓶颈。但此问题较简单,我们直接用“瞪眼法”分析。
实现三的并发瓶颈很明显,因为在锁 BUFFER_LOCK
看来,任何消费者线程与生产者线程都是一样的。换句话说,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)操作缓冲区 buffer。
而实际上,如果缓冲区是一个队列的话,“生产者将产品入队”与“消费者将产品出队”两个操作之间没有同步关系,可以在队首出队的同时,在队尾入队。理想性能可提升至实现三的两倍。
去掉这个瓶颈
那么思路就简单了:需要两个锁 CONSUME_LOCK
与PRODUCE_LOCK
,CONSUME_LOCK
控制消费者线程并发出队,PRODUCE_LOCK
控制生产者线程并发入队;相应需要两个条件变量NOT_EMPTY
与NOT_FULL
,NOT_EMPTY
负责控制消费者线程的状态(阻塞、运行),NOT_FULL
负责控制生产者线程的状态(阻塞、运行)。以此让优化消费者与消费者(或生产者与生产者)之间是串行的;消费者与生产者之间是并行的。
package com.github.xuchengen.concurrent.impl; import com.github.xuchengen.concurrent.AbsConsumer; import com.github.xuchengen.concurrent.AbsProducer; import com.github.xuchengen.concurrent.Model; import com.github.xuchengen.concurrent.Task; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 基于锁条件实现 * 作者:徐承恩 * 邮箱:xuchengen@gmail.com * 日期:2019/12/12 */ public class LockConditionModel2 implements Model { private final Lock CONSUME_LOCK = new ReentrantLock(); private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition(); private final Lock PRODUCE_LOCK = new ReentrantLock(); private final Condition NOT_FULL = PRODUCE_LOCK.newCondition(); private final Buffer<Task> buffer = new Buffer<>(); private AtomicInteger bufLen = new AtomicInteger(0); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public LockConditionModel2(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbsConsumer { @Override public void consume() throws InterruptedException { int newBufSize = -1; CONSUME_LOCK.lockInterruptibly(); try { while (bufLen.get() == 0) { System.out.println("buffer is empty..."); NOT_EMPTY.await(); } Task task = buffer.poll(); assert task != null; // 固定时间范围的消费,模拟相对稳定的服务器处理过程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); newBufSize = bufLen.decrementAndGet(); if (newBufSize > 0) { // 非空唤醒继续消费 NOT_EMPTY.signalAll(); } } finally { CONSUME_LOCK.unlock(); } if (newBufSize < cap) { PRODUCE_LOCK.lockInterruptibly(); try { // 未满唤醒生产者继续生产 NOT_FULL.signalAll(); } finally { PRODUCE_LOCK.unlock(); } } } } private class ProducerImpl extends AbsProducer { @Override public void produce() throws InterruptedException { // 不定期生产,模拟随机的用户请求 Thread.sleep((long) (Math.random() * 1000)); int newBufSize = -1; PRODUCE_LOCK.lockInterruptibly(); try { while (bufLen.get() == cap) { System.out.println("buffer is full..."); NOT_FULL.await(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); newBufSize = bufLen.incrementAndGet(); System.out.println("produce: " + task.no); if (newBufSize < cap) { // 未满唤醒继续消费 NOT_FULL.signalAll(); } } finally { PRODUCE_LOCK.unlock(); } if (newBufSize > 0) { CONSUME_LOCK.lockInterruptibly(); try { // 非空唤醒消费者继续消费 NOT_EMPTY.signalAll(); } finally { CONSUME_LOCK.unlock(); } } } } private static class Buffer<E> { private Node head; private Node tail; Buffer() { // dummy node head = tail = new Node(null); } public void offer(E e) { tail.next = new Node(e); tail = tail.next; } public E poll() { head = head.next; E e = head.item; head.item = null; return e; } private class Node { E item; Node next; Node(E item) { this.item = item; } } } public static void main(String[] args) { Model model = new LockConditionModel2(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } }
需要注意的是,由于需要同时在UnThreadSafe的缓冲区 buffer 上进行消费与生产,我们不能使用实现二、三中使用的队列了,需要自己实现一个简单的缓冲区 Buffer。Buffer要满足以下条件:
- 在头部出队,尾部入队
- 在poll()方法中只操作head
- 在offer()方法中只操作tail
还能进一步优化吗
我们已经优化掉了消费者与生产者之间的瓶颈,还能进一步优化吗?
如果可以,必然是继续优化消费者与消费者(或生产者与生产者)之间的并发性能。然而,消费者与消费者之间必须是串行的,因此,并发模型上已经没有地方可以继续优化了。
仔细分析下,实现四中的signalAll都可以换成signal。这里为了屏蔽复杂性,回避了这个优化。
不过在具体的业务场景中,一般还能够继续优化。如:
- 并发规模中等,可考虑使用CAS代替重入锁
- 模型上不能优化,但一个消费行为或许可以进一步拆解、优化,从而降低消费的延迟
- 一个队列的并发性能达到了极限,可采用“多个队列”(如分布式消息队列等)
这4种写法的本质相同——都是在使用或实现BlockingQueue。实现一直接使用BlockingQueue,实现四实现了简单的BlockingQueue,而实现二、三则实现了退化版的BlockingQueue(性能降低一半)。