月度归档:2019年12月

IPv4地址转Int之Java实现

讲个故事

面试官:IPv4地址可以转为Long类型的数字知道吧?你写一下这个转换的代码!

对计算机基础逐渐模糊的小徐一脸懵逼,毕竟工作中很少会用到,只记得IP地址和整数是可以相互转换的,但是从来没有自己实现过。于是在大脑中飞速计算。过了一会,思路出现了:IP地址分为四段,每段都是0~255之间的数,每段可以用8位来装下它,4×8=32位,也就是可以将IP地址转为32位的整数。咦?面试官居然让转成Long,但是Long有64位啊!一个int就搞定了,为什么要转为Long呢?但是自己没实现过,心里没底,再一犯嘀咕,然后回答不出来。面试没通过!!!

之后小徐一直心心念念这个问题,回来立马自己写了代码实现了一下,果然是int就搞定了,之前的思路一点都没错!!!

面试有时考验的不只是技术,还有自信心!!!

 

IP地址是一个32位的二进制数,通常被分割为4个“8位二进制数”(也就是4个字节)。IP地址通常用“点分十进制”表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之间的十进制整数。

例:点分十进IP地址(100.4.5.6),实际上是32位二进制数(01100100.00000100.00000101.00000110)。

 

为什么8位能存储0~255之间的数字?

一个字节8个位,每个位就只有0跟1两种情况,8个位能表示2的8次方即256,范围0-255(带负值的话范围在:-128~127);

8位最小二进制为00000000转十进制即为0

8位最大二进制为11111111转十进制即为255

所以8位可存储0~255之间的数

IP地址本身就是一个32位的二进制数,只是通常被以a.b.c.d的形式表示而已。

那么为什么要将IP转为数字呢?

其实就是时间换空间的一种方式。

String类型的IP占用7个字节到15个字节0.0.0.0~255.255.255.255,而int只需要4个字节。

IP字符串转换为Int

/**
 * 将 ip 字符串转换为 int 类型的数字
 * <p>
 * 思路就是将 ip 的每一段数字转为 8 位二进制数,并将它们放在结果的适当位置上
 *
 * @param ipString ip字符串,如 127.0.0.1
 * @return ip字符串对应的 int 值
 */
public static int ip2Int(String ipString) {
    // 取 ip 的各段
    String[] ipSlices = ipString.split("\\.");
    int rs = 0;
    for (int i = 0; i < ipSlices.length; i++) {
        // 将 ip 的每一段解析为 int,并根据位置左移 8 位
        int intSlice = Integer.parseInt(ipSlices[i]) << 8 * i;
        // 或运算
        rs = rs | intSlice;
    }
    return rs;
}

有一个技巧,就是 或运算。就是将每段的 int 值左移到恰当的位置后跟保存结果的 int 值进行或运算。

255.255.255.255 这个地址为例,上面的或运算过程如下:

00000000 00000000 00000000 00000000
00000000 00000000 00000000 11111111
------------或运算------------
00000000 00000000 00000000 11111111
00000000 00000000 11111111 00000000
------------或运算------------
00000000 00000000 11111111 11111111
00000000 11111111 00000000 00000000
------------或运算------------
00000000 11111111 11111111 11111111
11111111 00000000 00000000 00000000
-----------最终结果------------
11111111 11111111 11111111 11111111

那么如何将 int 再转为字符串的表示法呢?

Int转换为IP字符串

思路是一样的,将 int 值的 32 位分为 4 个 8 位数字,然后这 4 个 8 位的数字用 0~255 的数字进行表示,用点号分隔即可。我们也基于位运算,7行代码即可实现:

/**
 * 将 int 转换为 ip 字符串
 *
 * @param ipInt 用 int 表示的 ip 值
 * @return ip字符串,如 127.0.0.1
 */
public static String int2Ip(int ipInt) {
    String[] ipString = new String[4];
    for (int i = 0; i < 4; i++) {
        // 每 8 位为一段,这里取当前要处理的最高位的位置
        int pos = i * 8;
        // 取当前处理的 ip 段的值
        int and = ipInt & (255 << pos);
        // 将当前 ip 段转换为 0 ~ 255 的数字,注意这里必须使用无符号右移
        ipString[i] = String.valueOf(and >>> pos);
    }
    return String.join(".", ipString);
}

这里使用与运算来取每次处理的 ip 片段。取最高的 8 位时,涉及到符号的处理,因此在将每段 8 位转为 0~255 的数字时必须使用无符号右移运算,否则最后处理的部分因为符号问题会不准确。

测试一下

public static void main(String[] args) {
    String[] ips4Test = new String[]{"0.0.0.0", "127.0.0.1", 
    	"192.168.1.1", "255.0.0.255", "255.255.255.255"};
    for (String ip : ips4Test) {
        test(ip);
    }
}

public static void test(String ip) {
    int ipInt = ip2Int(ip);
    String ipString = int2Ip(ipInt);
    System.out.println("用于测试的ip地址: " + ip + ", int表示: " + ipInt + ", 二进制: " + Long.toBinaryString(ipInt)
            + ", 转回String: " + ipString + ",与测试 ip 地址是否相等: " + ip.equals(ipString));
}

打印结果:

用于测试的ip地址: 0.0.0.0, int表示: 0, 二进制: 0, 转回String: 0.0.0.0,与测试 ip 地址是否相等: true
用于测试的ip地址: 127.0.0.1, int表示: 16777343, 二进制: 1000000000000000001111111, 转回String: 127.0.0.1,与测试 ip 地址是否相等: true
用于测试的ip地址: 192.168.1.1, int表示: 16885952, 二进制: 1000000011010100011000000, 转回String: 192.168.1.1,与测试 ip 地址是否相等: true
用于测试的ip地址: 255.0.0.255, int表示: -16776961, 二进制: 1111111111111111111111111111111111111111000000000000000011111111, 转回String: 255.0.0.255,与测试 ip 地址是否相等: true
用于测试的ip地址: 255.255.255.255, int表示: -1, 二进制: 1111111111111111111111111111111111111111111111111111111111111111, 转回String: 255.255.255.255,与测试 ip 地址是否相等: true

注意:这里相互转换的算法是配套的,不同的转换算法计算的 int 值可能会不一样,因为虽然都是处理 ip 的 4 个部分,但是它们的结合顺序可以不一样,因此以怎样的顺序搭配转为 int,就应该以相同的顺序解析为 String。

Spring事务隔离级别和传播特性

传播行为

事务的第一个方面是传播行为。传播行为定义关于客户端和被调用方法的事务边界。Spring定义了7中传播行为。如下表所示:

传播行为意义
PROPAGATION_MANDATORY表示该方法必须运行在一个事务中。如果当前没有事务正在发生,将抛出一个异常
PROPAGATION_NESTED表示如果当前正有一个事务在进行中,则该方法应当运行在一个嵌套式事务中。被嵌套的事务可以独立于封装事务进行提交或回滚。如果封装事务不存在,行为就像PROPAGATION_REQUIRES一样。
PROPAGATION_NEVER表示当前的方法不应该在一个事务中运行。如果一个事务正在进行,则会抛出一个异常。
PROPAGATION_NOT_SUPPORTED表示该方法不应该在一个事务中运行。如果一个现有事务正在进行中,它将在该方法的运行期间被挂起。
PROPAGATION_SUPPORTS表示当前方法不需要事务性上下文,但是如果有一个事务已经在运行的话,它也可以在这个事务里运行。
PROPAGATION_REQUIRES_NEW表示当前方法必须在它自己的事务里运行。一个新的事务将被启动,而且如果有一个现有事务在运行的话,则将在这个方法运行期间被挂起。
PROPAGATION_REQUIRES表示当前方法必须在一个事务中运行。如果一个现有事务正在进行中,该方法将在那个事务中运行,否则就要开始一个新事务。

传播规则回答了这样一个问题,就是一个新的事务应该被启动还是被挂起,或者是一个方法是否应该在事务性上下文中运行。

隔离级别

声明式事务的第二个方面是隔离级别。隔离级别定义一个事务可能受其他并发事务活动活动影响的程度。另一种考虑一个事务的隔离级别的方式,是把它想象为那个事务对于事物处理数据的自私程度。

在一个典型的应用程序中,多个事务同时运行,经常会为了完成他们的工作而操作同一个数据。并发虽然是必需的,但是会导致一下问题:

  • 脏读(Dirty read)– 脏读发生在一个事务读取了被另一个事务改写但尚未提交的数据时。如果这些改变在稍后被回滚了,那么第一个事务读取的数据就会是无效的。
  • 不可重复读(Nonrepeatable read)– 不可重复读发生在一个事务执行相同的查询两次或两次以上,但每次查询结果都不相同时。这通常是由于另一个并发事务在两次查询之间更新了数据。
  • 幻影读(Phantom reads)– 幻影读和不可重复读相似。当一个事务(T1)读取几行记录后,另一个并发事务(T2)插入了一些记录时,幻影读就发生了。在后来的查询中,第一个事务(T1)就会发现一些原来没有的额外记录。

在理想状态下,事务之间将完全隔离,从而可以防止这些问题发生。然而,完全隔离会影响性能,因为隔离经常牵扯到锁定在数据库中的记录(而且有时是锁定完整的数据表)。侵占性的锁定会阻碍并发,要求事务相互等待来完成工作。

考虑到完全隔离会影响性能,而且并不是所有应用程序都要求完全隔离,所以有时可以在事务隔离方面灵活处理。因此,就会有好几个隔离级别。

隔离级别含义
ISOLATION_DEFAULT使用后端数据库默认的隔离级别。
ISOLATION_READ_UNCOMMITTED允许读取尚未提交的更改。可能导致脏读、幻影读或不可重复读。
ISOLATION_READ_COMMITTED允许从已经提交的并发事务读取。可防止脏读,但幻影读和不可重复读仍可能会发生。
ISOLATION_REPEATABLE_READ对相同字段的多次读取的结果是一致的,除非数据被当前事务本身改变。可防止脏读和不可重复读,但幻影读仍可能发生。
ISOLATION_SERIALIZABLE完全服从ACID的隔离级别,确保不发生脏读、不可重复读和幻影读。这在所有隔离级别中也是最慢的,因为它通常是通过完全锁定当前事务所涉及的数据表来完成的。

只读(Read Only)

声明式事务的第三个特性是它是否是一个只读事务。如果一个事务只对后端数据库执行读操作,那么该数据库就可能利用那个事务的只读特性,采取某些优化 措施。通过把一个事务声明为只读,可以给后端数据库一个机会来应用那些它认为合适的优化措施。由于只读的优化措施是在一个事务启动时由后端数据库实施的, 因此,只有对于那些具有可能启动一个新事务的传播行为(PROPAGATION_REQUIRES_NEW、PROPAGATION_REQUIRED、 ROPAGATION_NESTED)的方法来说,将事务声明为只读才有意义。

此外,如果使用Hibernate作为持久化机制,那么把一个事务声明为只读,将使Hibernate的flush模式被设置为FLUSH_NEVER。这就告诉Hibernate避免和数据库进行不必要的对象同步,从而把所有更新延迟到事务的结束。

事务超时(Timeout)

为了使一个应用程序很好地执行,它的事务不能运行太长时间。因此,声明式事务的下一个特性就是它的超时。

假设事务的运行时间变得格外的长,由于事务可能涉及对后端数据库的锁定,所以长时间运行的事务会不必要地占用数据库资源。这时就可以声明一个事务在特定秒数后自动回滚,不必等它自己结束。

由于超时时钟在一个事务启动的时候开始的,因此,只有对于那些具有可能启动一个新事务的传播行为(PROPAGATION_REQUIRES_NEW、PROPAGATION_REQUIRED、ROPAGATION_NESTED)的方法来说,声明事务超时才有意义。

回滚(Rollback)

在默认设置下,事务只在出现运行时异常(runtime exception)时回滚,而在出现受检查异常(checked exception)时不回滚(这一行为和EJB中的回滚行为是一致的)。

不过,也可以声明在出现特定受检查异常时像运行时异常一样回滚。同样,也可以声明一个事务在出现特定的异常时不回滚,即使那些异常是运行时一场。

理解脏读、不可重复读、幻读

脏读

脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。

不可重复读

不可重复读是指在一个事务内,多次读同一数据。在这个事务还没有结束时,另外一个事务也访问该同一数据。那么,在第一个事务中的两 次读数据之间,由于第二个事务的修改,那么第一个事务两次读到的的数据可能是不一样的。这样就发生了在一个事务内两次读到的数据是不一样的,因此称为是不 可重复读。

例如,一个编辑人员两次读取同一文档,但在两次读取之间,作者重写了该文档。当编辑人员第二次读取文档时,文档已更改。原始读取不可重复。如果 只有在作者全部完成编写后编辑人员才可以读取文档,则可以避免该问题。

幻读

幻读是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,这种修改涉及到表中的全部数据行。 同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象发生了幻觉一样。

例如,一个编辑人员更改作者提交的文档,但当生产部门将其更改内容合并到该文档的主复本时,发现作者已将未编辑的新材料添加到该文档中。 如果在编辑人员和生产部门完成对原始文档的处理之前,任何人都不能将新材料添加到文档中,则可以避免该问题。

基于元数据的 Spring 声明性事务 :

隔离属性一共支持五种事务设置,具体介绍如下:

  1. DEFAULT 使用数据库设置的隔离级别(默认) ,由 DBA 默认的设置来决定隔离级别 。
  2. READ_UNCOMMITTED 会出现脏读、不可重复读、幻读(隔离级别最低,并发性能高)。
  3. READ_COMMITTED  会出现不可重复读、幻读问题(锁定正在读取的行)。
  4. REPEATABLE_READ 会出幻读(锁定所读取的所有行)。
  5. SERIALIZABLE 保证所有的情况不会发生(锁表)。

 

不可重复读的重点是修改

同样的条件,读取过的数据,再次读取出来发现值不一样了。

幻读的重点在于新增或者删除

同样的条件,第1次和第2次读出来的记录数不一样。

Java生产者消费者模型实践五

回顾前面做一些实验,你会发现,实现一的并发性能高于实现二、三。暂且不关心BlockingQueue的具体实现,来分析看如何优化实现三(与实现二的思路相同,性能相当)的性能。

分析实现三的瓶颈

最好的查证方法是记录方法执行时间,这样可以直接定位到真正的瓶颈。但此问题较简单,我们直接用“瞪眼法”分析。

实现三的并发瓶颈很明显,因为在锁 BUFFER_LOCK 看来,任何消费者线程与生产者线程都是一样的。换句话说,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)操作缓冲区 buffer。

而实际上,如果缓冲区是一个队列的话,“生产者将产品入队”与“消费者将产品出队”两个操作之间没有同步关系,可以在队首出队的同时,在队尾入队。理想性能可提升至实现三的两倍

去掉这个瓶颈

那么思路就简单了:需要两个锁 CONSUME_LOCKPRODUCE_LOCKCONSUME_LOCK控制消费者线程并发出队,PRODUCE_LOCK控制生产者线程并发入队;相应需要两个条件变量NOT_EMPTYNOT_FULLNOT_EMPTY负责控制消费者线程的状态(阻塞、运行),NOT_FULL负责控制生产者线程的状态(阻塞、运行)。以此让优化消费者与消费者(或生产者与生产者)之间是串行的;消费者与生产者之间是并行的。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 基于锁条件实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/12
 */
public class LockConditionModel2 implements Model {

    private final Lock CONSUME_LOCK = new ReentrantLock();

    private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();

    private final Lock PRODUCE_LOCK = new ReentrantLock();

    private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();

    private final Buffer<Task> buffer = new Buffer<>();

    private AtomicInteger bufLen = new AtomicInteger(0);

    private final int cap;

    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public LockConditionModel2(int cap) {
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {
        @Override
        public void consume() throws InterruptedException {
            int newBufSize = -1;
            CONSUME_LOCK.lockInterruptibly();
            try {
                while (bufLen.get() == 0) {
                    System.out.println("buffer is empty...");
                    NOT_EMPTY.await();
                }

                Task task = buffer.poll();

                assert task != null;
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                Thread.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.no);

                newBufSize = bufLen.decrementAndGet();
                if (newBufSize > 0) {
                    // 非空唤醒继续消费
                    NOT_EMPTY.signalAll();
                }
            } finally {
                CONSUME_LOCK.unlock();
            }
            if (newBufSize < cap) {
                PRODUCE_LOCK.lockInterruptibly();
                try {
                    // 未满唤醒生产者继续生产
                    NOT_FULL.signalAll();
                } finally {
                    PRODUCE_LOCK.unlock();
                }
            }
        }
    }


    private class ProducerImpl extends AbsProducer {
        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            int newBufSize = -1;
            PRODUCE_LOCK.lockInterruptibly();
            try {
                while (bufLen.get() == cap) {
                    System.out.println("buffer is full...");
                    NOT_FULL.await();
                }
                Task task = new Task(increTaskNo.getAndIncrement());
                buffer.offer(task);
                newBufSize = bufLen.incrementAndGet();
                System.out.println("produce: " + task.no);
                if (newBufSize < cap) {
                    // 未满唤醒继续消费
                    NOT_FULL.signalAll();
                }
            } finally {
                PRODUCE_LOCK.unlock();
            }
            if (newBufSize > 0) {
                CONSUME_LOCK.lockInterruptibly();
                try {
                    // 非空唤醒消费者继续消费
                    NOT_EMPTY.signalAll();
                } finally {
                    CONSUME_LOCK.unlock();
                }
            }
        }
    }

    private static class Buffer<E> {
        private Node head;
        private Node tail;

        Buffer() {
            // dummy node
            head = tail = new Node(null);
        }

        public void offer(E e) {
            tail.next = new Node(e);
            tail = tail.next;
        }

        public E poll() {
            head = head.next;
            E e = head.item;
            head.item = null;
            return e;
        }

        private class Node {
            E item;
            Node next;

            Node(E item) {
                this.item = item;
            }
        }
    }

    public static void main(String[] args) {
        Model model = new LockConditionModel2(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

需要注意的是,由于需要同时在UnThreadSafe的缓冲区 buffer 上进行消费与生产,我们不能使用实现二、三中使用的队列了,需要自己实现一个简单的缓冲区 Buffer。Buffer要满足以下条件:

  • 在头部出队,尾部入队
  • 在poll()方法中只操作head
  • 在offer()方法中只操作tail

还能进一步优化吗

我们已经优化掉了消费者与生产者之间的瓶颈,还能进一步优化吗?

如果可以,必然是继续优化消费者与消费者(或生产者与生产者)之间的并发性能。然而,消费者与消费者之间必须是串行的,因此,并发模型上已经没有地方可以继续优化了。

仔细分析下,实现四中的signalAll都可以换成signal。这里为了屏蔽复杂性,回避了这个优化。

不过在具体的业务场景中,一般还能够继续优化。如:

  • 并发规模中等,可考虑使用CAS代替重入锁
  • 模型上不能优化,但一个消费行为或许可以进一步拆解、优化,从而降低消费的延迟
  • 一个队列的并发性能达到了极限,可采用“多个队列”(如分布式消息队列等)

这4种写法的本质相同——都是在使用或实现BlockingQueue。实现一直接使用BlockingQueue,实现四实现了简单的BlockingQueue,而实现二、三则实现了退化版的BlockingQueue(性能降低一半)。

Java生产者消费者模型实践四

我们要保证理解wait && notify机制。实现时可以使用Object类提供的wait()方法与notifyAll()方法,但更推荐的方式是使用java.util.concurrent包提供的Lock && Condition

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 基于锁条件实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/12
 */
public class LockConditionModel1 implements Model {

    private final Lock BUFFER_LOCK = new ReentrantLock();

    private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();

    private final Queue<Task> buffer = new LinkedList<>();

    private final int cap;

    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public LockConditionModel1(int cap) {
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {
        @Override
        public void consume() throws InterruptedException {
            BUFFER_LOCK.lockInterruptibly();
            try {
                while (buffer.size() == 0) {
                    // 当缓冲中元素为空则阻塞该线程与此同时会唤醒生产者线程进行生产
                    BUFFER_COND.await();
                }
                Task task = buffer.poll();
                assert task != null;
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                Thread.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.no);
                // 消费后唤醒生产者线程进行生产
                BUFFER_COND.signalAll();
            } finally {
                BUFFER_LOCK.unlock();
            }
        }
    }

    private class ProducerImpl extends AbsProducer {
        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            BUFFER_LOCK.lockInterruptibly();
            try {
                while (buffer.size() == cap) {
                    // 当缓冲中元素已满则阻塞该线程与此同时会唤醒消费者线程进行消费
                    BUFFER_COND.await();
                }
                Task task = new Task(increTaskNo.getAndIncrement());
                buffer.offer(task);
                System.out.println("produce: " + task.no);
                // 生产后唤醒消费者线程进行消费
                BUFFER_COND.signalAll();
            } finally {
                BUFFER_LOCK.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Model model = new LockConditionModel1(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

该写法的思路与实现二的思路完全相同,仅仅将锁与条件变量换成了Lock和Condition。

Java生产者消费者模型实践三

如果不能将并发与容量控制都封装在缓冲区中,就只能由消费者与生产者完成。最简单的方案是使用朴素的wait && notify机制。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于对象锁wait和notify实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class WaitNotifyModel implements Model {

    private final Object BUFFER_LOCK = new Object();

    private final Queue<Task> buffer = new LinkedList<>();

    private final int cap;

    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public WaitNotifyModel(int cap) {
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {
        @Override
        public void consume() throws InterruptedException {
            synchronized (BUFFER_LOCK) {
                while (buffer.size() == 0) {
                    // 当缓冲中元素为空则阻塞该线程与此同时会唤醒生产者线程进行生产
                    BUFFER_LOCK.wait();
                }
                Task task = buffer.poll();
                assert task != null;
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                Thread.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.no);
                // 消费后唤醒生产者线程进行生产
                BUFFER_LOCK.notifyAll();
            }
        }
    }

    private class ProducerImpl extends AbsProducer {
        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            synchronized (BUFFER_LOCK) {
                while (buffer.size() == cap) {
                    // 当缓冲中元素已满则阻塞该线程与此同时会唤醒消费者线程进行消费
                    BUFFER_LOCK.wait();
                }
                Task task = new Task(increTaskNo.getAndIncrement());
                buffer.offer(task);
                System.out.println("produce: " + task.no);
                // 生产后唤醒消费者线程进行消费
                BUFFER_LOCK.notifyAll();
            }
        }
    }

    public static void main(String[] args) {
        Model model = new WaitNotifyModel(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

Java生产者消费者模型实践二

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。而BlockingQueue的性质天生满足这个要求。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于阻塞队列的缓冲实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class BlockingQueueModel implements Model {

    private final BlockingQueue<Task> queue;

    private final AtomicInteger increTaskNo = new AtomicInteger();

    public BlockingQueueModel(int cap) {
        this.queue = new LinkedBlockingQueue<>(cap);
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {

        @Override
        public void consume() throws InterruptedException {
            Task task = queue.take();
            // 固定时间范围的消费,模拟相对稳定的服务器处理过程
            Thread.sleep(500 + (long) (Math.random() * 500));
            System.out.println("consume: " + task.no);
        }
    }

    private class ProducerImpl extends AbsProducer {

        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            Task task = new Task(increTaskNo.getAndIncrement());
            System.out.println("produce: " + task.no);
            queue.put(task);
        }
    }

    public static void main(String[] args) {

        Model model = new BlockingQueueModel(3);

        //2个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }

        //5个生产者
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

由于操作“出队/入队+日志输出”不是原子的,所以上述日志的绝对顺序与实际的出队/入队顺序有出入,但对于同一个任务号task.no,其consume日志一定出现在其produce日志之后,即:同一任务的消费行为一定发生在生产行为之后。缓冲区的容量留给读者验证。符合两个验证条件。

BlockingQueue写法的核心只有两行代码,并发和容量控制都封装在了BlockingQueue中,正确性由BlockingQueue保证。面试中首选该写法,自然美观简单。

Java生产者消费者模型实践一

考查Java的并发编程时,手写“生产者-消费者模型”是一个经典问题。有如下几个考点:

  • 对Java并发模型的理解
  • 对Java并发编程接口的熟练程度
  • bug free
  • coding style

JDK版本:oracle java 1.8.0_102

本文主要归纳了4种写法,阅读后,最好在白板上练习几遍,检查自己是否掌握。这4种写法或者编程接口不同,或者并发粒度不同,但本质是相同的——都是在使用或实现BlockingQueue。

生产者-消费者模型

网上有很多生产者-消费者模型的定义和实现。本文研究最常用的有界生产者-消费者模型,简单概括如下:

  • 生产者持续生产,直到缓冲区满,阻塞;缓冲区不满后,继续生产
  • 消费者持续消费,直到缓冲区空,阻塞;缓冲区不空后,继续消费
  • 生产者可以有多个,消费者也可以有多个

可通过如下条件验证模型实现的正确性:

  • 同一产品的消费行为一定发生在生产行为之后
  • 任意时刻,缓冲区大小不小于0,不大于限制容量

该模型的应用和变种非常多,不赘述。

准备

面试时可语言说明以下准备代码。关键部分需要实现,如AbsConsumer。

下面会涉及多种生产者-消费者模型的实现,可以先抽象出关键的接口,并实现一些抽象类:

package com.github.xuchengen.concurrent;

/**
 * 生产者接口
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public interface Producer {

    /**
     * 生产者负责生产
     *
     * @throws InterruptedException 线程意外终止异常
     */
    void produce() throws InterruptedException;

}
package com.github.xuchengen.concurrent;

/**
 * 消费者接口
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public interface Consumer {

    /**
     * 消费者负责消费
     *
     * @throws InterruptedException 线程意外终止异常
     */
    void consume() throws InterruptedException;

}
package com.github.xuchengen.concurrent;

/**
 * 抽象生产者
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public abstract class AbsProducer implements Producer, Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }
}
package com.github.xuchengen.concurrent;

/**
 * 抽象生产者
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public abstract class AbsConsumer implements Consumer, Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }

}

不同的模型实现中,生产者、消费者的具体实现也不同,所以需要为模型定义抽象工厂方法:

package com.github.xuchengen.concurrent;

/**
 * 模型
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public interface Model {

    /**
     * 实例化一个消费者
     *
     * @return Runnable
     */
    Runnable newRunnableConsumer();

    /**
     * 实例化一个生产者
     *
     * @return Runnable
     */
    Runnable newRunnableProducer();

}

我们将Task作为生产和消费的单位:

package com.github.xuchengen.concurrent;

/**
 * 任务
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class Task {

    /**
     * 任务号
     */
    public int no;

    public Task(int no) {
        this.no = no;
    }
}

如果需求还不明确(这符合大部分工程工作的实际情况),建议边实现边抽象,不要“面向未来编程”

设计模式分类(创建型模式、结构型模式、行为模式)

创建型模式

创建型模式,就是创建对象的模式,抽象了实例化的过程。它帮助一个系统独立于如何创建、组合和表示它的那些对象。关注的是对象的创建,创建型模式将创建对象的过程进行了抽象,也可以理解为将创建对象的过程进行了封装,作为客户程序仅仅需要去使用对象,而不再关系创建对象过程中的逻辑。

社会化的分工越来越细,自然在软件设计方面也是如此,因此对象的创建和对象的使用分开也就成为了必然趋势。因为对象的创建会消耗掉系统的很多资源,所以单独对对象的创建进行研究,从而能够高效地创建对象就是创建型模式要探讨的问题。这里有6个具体的创建型模式可供研究,它们分别是:

  • 简单工厂模式(Simple Factory)
  • 工厂方法模式(Factory Method)
  • 抽象工厂模式(Abstract Factory)
  • 创建者模式(Builder)
  • 原型模式(Prototype)
  • 单例模式(Singleton)

简单工厂模式不是GoF总结出来的23种设计模式之一

结构型模式

结构型模式是为解决怎样组装现有的类,设计它们的交互方式,从而达到实现一定的功能目的。结构型模式包容了对很多问题的解决。例如:扩展性(外观、组成、代理、装饰)、封装(适配器、桥接)。

在解决了对象的创建问题之后,对象的组成以及对象之间的依赖关系就成了开发人员关注的焦点,因为如何设计对象的结构、继承和依赖关系会影响到后续程序的维护性、代码的健壮性、耦合性等。对象结构的设计很容易体现出设计人员水平的高低,这里有7个具体的结构型模式可供研究,它们分别是:

  • 外观模式/门面模式(Facade门面模式)
  • 适配器模式(Adapter)
  • 代理模式(Proxy)
  • 装饰模式(Decorator)
  • 桥梁模式/桥接模式(Bridge)
  • 组合模式(Composite)
  • 享元模式(Flyweight)

行为型模式

行为型模式涉及到算法和对象间职责的分配,行为模式描述了对象和类的模式,以及它们之间的通信模式,行为模式刻划了在程序运行时难以跟踪的复杂的控制流可分为行为类模式和行为对象模式。1. 行为类模式使用继承机制在类间分派行为。2. 行为对象模式使用对象聚合来分配行为。一些行为对象模式描述了一组对等的对象怎样相互协作以完成其中任何一个对象都无法单独完成的任务。

在对象的结构和对象的创建问题都解决了之后,就剩下对象的行为问题了,如果对象的行为设计的好,那么对象的行为就会更清晰,它们之间的协作效率就会提高,这里有11个具体的行为型模式可供研究,它们分别是:

  • 模板方法模式(Template Method)
  • 观察者模式(Observer)
  • 状态模式(State)
  • 策略模式(Strategy)
  • 职责链模式(Chain of Responsibility)
  • 命令模式(Command)
  • 访问者模式(Visitor)
  • 调停者模式(Mediator)
  • 备忘录模式(Memento)
  • 迭代器模式(Iterator)
  • 解释器模式(Interpreter)

三者之间的区别和联系

创建型模式提供生存环境,结构型模式提供生存理由,行为型模式提供如何生存。

  1. 创建型模式为其他两种模式使用提供了环境。
  2. 结构型模式侧重于接口的使用,它做的一切工作都是对象或是类之间的交互,提供一个门。
  3. 行为型模式顾名思义,侧重于具体行为,所以概念中才会出现职责分配和算法通信等内容。

设计原则

  1. 开闭原则: 对扩展开放,对修改关闭
  2. 里氏转换原则: 子类继承父类,单独完全可以运行
  3. 依赖倒转原则: 引用一个对象,如果这个对象有底层类型,直接引用底层类型
  4. 接口隔离原则: 每一个接口应该是一种角色
  5. 合成/聚合复用原则: 新的对象应使用一些已有的对象,使之成为新对象的一部分
  6. 迪米特原则: 一个对象应对其他对象有尽可能少的了解