回顾前面做一些实验,你会发现,实现一的并发性能高于实现二、三。暂且不关心BlockingQueue的具体实现,来分析看如何优化实现三(与实现二的思路相同,性能相当)的性能。
分析实现三的瓶颈
最好的查证方法是记录方法执行时间,这样可以直接定位到真正的瓶颈。但此问题较简单,我们直接用“瞪眼法”分析。
实现三的并发瓶颈很明显,因为在锁 BUFFER_LOCK
看来,任何消费者线程与生产者线程都是一样的。换句话说,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)操作缓冲区 buffer。
而实际上,如果缓冲区是一个队列的话,“生产者将产品入队”与“消费者将产品出队”两个操作之间没有同步关系,可以在队首出队的同时,在队尾入队。理想性能可提升至实现三的两倍。
去掉这个瓶颈
那么思路就简单了:需要两个锁 CONSUME_LOCK
与PRODUCE_LOCK
,CONSUME_LOCK
控制消费者线程并发出队,PRODUCE_LOCK
控制生产者线程并发入队;相应需要两个条件变量NOT_EMPTY
与NOT_FULL
,NOT_EMPTY
负责控制消费者线程的状态(阻塞、运行),NOT_FULL
负责控制生产者线程的状态(阻塞、运行)。以此让优化消费者与消费者(或生产者与生产者)之间是串行的;消费者与生产者之间是并行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
package com.github.xuchengen.concurrent.impl; import com.github.xuchengen.concurrent.AbsConsumer; import com.github.xuchengen.concurrent.AbsProducer; import com.github.xuchengen.concurrent.Model; import com.github.xuchengen.concurrent.Task; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 基于锁条件实现 * 作者:徐承恩 * 邮箱:xuchengen@gmail.com * 日期:2019/12/12 */ public class LockConditionModel2 implements Model { private final Lock CONSUME_LOCK = new ReentrantLock(); private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition(); private final Lock PRODUCE_LOCK = new ReentrantLock(); private final Condition NOT_FULL = PRODUCE_LOCK.newCondition(); private final Buffer<Task> buffer = new Buffer<>(); private AtomicInteger bufLen = new AtomicInteger(0); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public LockConditionModel2(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbsConsumer { @Override public void consume() throws InterruptedException { int newBufSize = -1; CONSUME_LOCK.lockInterruptibly(); try { while (bufLen.get() == 0) { System.out.println("buffer is empty..."); NOT_EMPTY.await(); } Task task = buffer.poll(); assert task != null; // 固定时间范围的消费,模拟相对稳定的服务器处理过程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); newBufSize = bufLen.decrementAndGet(); if (newBufSize > 0) { // 非空唤醒继续消费 NOT_EMPTY.signalAll(); } } finally { CONSUME_LOCK.unlock(); } if (newBufSize < cap) { PRODUCE_LOCK.lockInterruptibly(); try { // 未满唤醒生产者继续生产 NOT_FULL.signalAll(); } finally { PRODUCE_LOCK.unlock(); } } } } private class ProducerImpl extends AbsProducer { @Override public void produce() throws InterruptedException { // 不定期生产,模拟随机的用户请求 Thread.sleep((long) (Math.random() * 1000)); int newBufSize = -1; PRODUCE_LOCK.lockInterruptibly(); try { while (bufLen.get() == cap) { System.out.println("buffer is full..."); NOT_FULL.await(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); newBufSize = bufLen.incrementAndGet(); System.out.println("produce: " + task.no); if (newBufSize < cap) { // 未满唤醒继续消费 NOT_FULL.signalAll(); } } finally { PRODUCE_LOCK.unlock(); } if (newBufSize > 0) { CONSUME_LOCK.lockInterruptibly(); try { // 非空唤醒消费者继续消费 NOT_EMPTY.signalAll(); } finally { CONSUME_LOCK.unlock(); } } } } private static class Buffer<E> { private Node head; private Node tail; Buffer() { // dummy node head = tail = new Node(null); } public void offer(E e) { tail.next = new Node(e); tail = tail.next; } public E poll() { head = head.next; E e = head.item; head.item = null; return e; } private class Node { E item; Node next; Node(E item) { this.item = item; } } } public static void main(String[] args) { Model model = new LockConditionModel2(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } |
需要注意的是,由于需要同时在UnThreadSafe的缓冲区 buffer 上进行消费与生产,我们不能使用实现二、三中使用的队列了,需要自己实现一个简单的缓冲区 Buffer。Buffer要满足以下条件:
- 在头部出队,尾部入队
- 在poll()方法中只操作head
- 在offer()方法中只操作tail
还能进一步优化吗
我们已经优化掉了消费者与生产者之间的瓶颈,还能进一步优化吗?
如果可以,必然是继续优化消费者与消费者(或生产者与生产者)之间的并发性能。然而,消费者与消费者之间必须是串行的,因此,并发模型上已经没有地方可以继续优化了。
仔细分析下,实现四中的signalAll都可以换成signal。这里为了屏蔽复杂性,回避了这个优化。
不过在具体的业务场景中,一般还能够继续优化。如:
- 并发规模中等,可考虑使用CAS代替重入锁
- 模型上不能优化,但一个消费行为或许可以进一步拆解、优化,从而降低消费的延迟
- 一个队列的并发性能达到了极限,可采用“多个队列”(如分布式消息队列等)
这4种写法的本质相同——都是在使用或实现BlockingQueue。实现一直接使用BlockingQueue,实现四实现了简单的BlockingQueue,而实现二、三则实现了退化版的BlockingQueue(性能降低一半)。
转载请注明:思码老徐 » Java生产者消费者模型实践五