作者归档:徐承恩

Java设计模式之静态代理模式

代理模式是一种设计模式,提供了对目标对象额外的访问方式,即通过代理对象访问目标对象,这样可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标对象的功能。

简言之,代理模式就是设置一个中间代理来控制访问原目标对象,以达到增强原对象的功能和简化访问方式。

代理模式UML类图

静态代理

静态代理需要代理对象和目标对象实现一样的接口。

优点:

1、可以在不修改目标对象的前提下扩展目标对象的功能。

缺点:

1、冗余。由于代理对象要实现与目标对象一致的接口,会产生过多的代理类。

2、不易维护。一旦接口增加方法,目标对象与代理对象都要进行修改。

静态代理示例代码

AccountDao接口类

package com.github.xuchengen.proxy.statics;

/**
 * 账户表数据访问层接口
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2020/4/1 2:03 下午
 */
public interface AccountDao {

    /**
     * 保存
     */
    void save();

}

AccountDaoImpl接口实现类

package com.github.xuchengen.proxy.statics;

/**
 * 账户表数据访问层接口实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2020/4/1 2:04 下午
 */
public class AccountDaoImpl implements AccountDao {

    @Override
    public void save() {
        System.out.println("保存账户数据");
    }

}

AccountDaoProxy代理类

package com.github.xuchengen.proxy.statics;

/**
 * 账户表数据访问层代理
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2020/4/1 2:07 下午
 */
public class AccountDaoProxy implements AccountDao {

    private AccountDao accountDao;

    /**
     * 构造方法
     *
     * @param accountDao 账户表数据访问层接口
     */
    public AccountDaoProxy(AccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @Override
    public void save() {
        try {
            System.out.println("开启事务");
            accountDao.save();
            System.out.println("提交事务");
        } catch (Exception e) {
            System.out.println("事务回滚");
        }
    }
}

StaticProxyTest测试类

package com.github.xuchengen.proxy.statics;

/**
 * 静态代理测试
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2020/4/1 2:11 下午
 */
public class StaticProxyTest {

    public static void main(String[] args) {

        AccountDao accountDao = new AccountDaoImpl();

        AccountDaoProxy accountDaoProxy = new AccountDaoProxy(accountDao);

        accountDaoProxy.save();

    }

}

控制台输出

开启事务
保存账户数据
提交事务

MySQL存储引擎总结

MySQL是一种关系型数据库,它有4种存储引擎分别是MyISAM、InnoDB、Memory、Archive。

MySQL支持那些存储引擎

1、MyISAM存储引擎
2、InnoDB存储引擎
3、Memory存储引擎
4、Archive存储引擎

InnoDB存储引擎

InnoDB是事务型数据库的首选引擎,支持事务安全表(ACID),支持行锁定和外键,InnoDB是默认的MySQL引擎。
InnoDB主要特性有:
1、InnoDB给MySQL提供了具有提交、回滚和崩溃恢复能力的事物安全(ACID兼容)存储引擎。InnoDB锁定在行级并且也在SELECT语句中提供一个类似Oracle的非锁定读。这些功能增加了多用户部署和性能。在SQL查询中,可以自由地将InnoDB类型的表和其他MySQL的表类型混合起来,甚至在同一个查询中也可以混合。
2、InnoDB是为处理巨大数据量的最大性能设计。它的CPU效率可能是任何其他基于磁盘的关系型数据库引擎所不能匹敌的。
3、InnoDB存储引擎完全与MySQL服务器整合,InnoDB存储引擎为在主内存中缓存数据和索引而维持它自己的缓冲池。InnoDB将它的表和索引在一个逻辑表空间中,表空间可以包含数个文件(或原始磁盘文件)。这与MyISAM表不同,比如在MyISAM表中每个表被存放在分离的文件中。InnoDB表可以是任何尺寸,即使在文件尺寸被限制为2GB的操作系统上。
4、InnoDB支持外键完整性约束,存储表中的数据时,每张表的存储都按主键顺序存放,如果没有显示在表定义时指定主键,InnoDB会为每一行生成一个6字节的ROWID,并以此作为主键。

MyISAM存储引擎

MyISAM基于ISAM存储引擎,并对其进行扩展。它是在Web、数据仓储和其他应用环境下最常使用的存储引擎之一。MyISAM拥有较高的插入、查询速度,但不支持事务。
MyISAM主要特性有:
1、大文件(达到63位文件长度)在支持大文件的文件系统和操作系统上被支持。
2、当把删除和更新及插入操作混合使用的时候,动态尺寸的行产生更少碎片。这要通过合并相邻被删除的块,以及若下一个块被删除,就扩展到下一块自动完成。
3、每个MyISAM表最大索引数是64,这可以通过重新编译来改变。每个索引最大的列数是16。
4、NULL被允许在索引的列中,这个值占每个键的0~1个字节。
5、可以把数据文件和索引文件放在不同目录(InnoDB是放在一个目录里面的)。

存储引擎的选择

不同的存储引擎都有各自的特点,以适应不同的需求,如下表所示:
功能MyISAMInnoDBMemoryArchive
存储限制256TB64TBRAMNone
支持事务NoYesNoNo
支持全文索引YesNoNoNo
支持数索引YesYesYesNo
支持哈希索引NoNoYesNo
支持数据缓存NoYesN/ANo
支持外键NoYesNoNo
如果要提供提交、回滚、崩溃恢复能力的事务安全(ACID兼容)能力,并要求实现并发控制,InnoDB是一个好的选择。

InnoDB 和 MyISAM之间的区别

1、InnoDB支持事物,而MyISAM不支持事物。
2、InnoDB支持行级锁,而MyISAM支持表级锁。
3、InnoDB支持MVCC, 而MyISAM不支持。
4、InnoDB支持外键,而MyISAM不支持。
5、InnoDB不支持全文索引,而MyISAM支持。

MyISAM

如果数据表主要用来插入和查询记录,则MyISAM(但是不支持事务)引擎能提供较高的处理效率

Memory

如果只是临时存放数据,数据量不大,并且不需要较高的数据安全性,可以选择将数据保存在内存中的Memory引擎,MySQL中使用该引擎作为临时表,存放查询的中间结果。数据的处理速度很快但是安全性不高。

Archive

如果只有INSERT和SELECT操作,可以选择Archive,Archive支持高并发的插入操作,但是本身不是事务安全的。Archive非常适合存储归档数据,如记录日志信息可以使用Archive
使用哪一种引擎需要灵活选择,一个数据库中多个表可以使用不同引擎以满足各种性能和实际需求,使用合适的存储引擎,将会提高整个数据库的性能。

ThreadLocal深度解析

ThreadLocal:为共享变量在每个线程中创建一个副本,每个线程可以访问自己内部的副本变量。

ThreadLocal是什么

ThreadLocal是一个本地线程副本变量工具类。主要用于将私有线程和该线程存放的副本对象做一个映射,各个线程之间的变量互不干扰,在高并发场景下,可以实现无状态的调用。

数据结构图

从上面的结构图,我们已经窥见ThreadLocal的核心机制:

  • 每个Thread线程内部都有一个Map。
  • Map里面存储线程本地对象(key)和线程的变量副本(value)
  • 但是,Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。

所以对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成了副本的隔离,互不干扰。

Thread线程内部的Map在类中描述如下:

public class Thread implements Runnable {
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
}

深入解析ThreadLocal

ThreadLocal类提供如下几个核心方法:

public T get()
public void set(T value)
public void remove()

get()方法用于获取当前线程的副本变量值。
set()方法用于保存当前线程的副本变量值。
initialValue()为当前线程初始副本变量值。
remove()方法移除当前前程的副本变量值。

get()方法

/**
 * Returns the value in the current thread's copy of this
 * thread-local variable.  If the variable has no value for the
 * current thread, it is first initialized to the value returned
 * by an invocation of the {@link #initialValue} method.
 *
 * @return the current thread's value of this thread-local
 */
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null)
            return (T)e.value;
    }
    return setInitialValue();
}

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

protected T initialValue() {
    return null;
}

步骤:
1.获取当前线程的ThreadLocalMap对象threadLocals
2.从map中获取线程存储的K-V Entry节点。
3.从Entry节点获取存储的Value副本值返回。
4.map为空的话返回初始值null,即线程变量副本为null,在使用时需要注意判断NullPointerException。

set()方法

/**
 * Sets the current thread's copy of this thread-local variable
 * to the specified value.  Most subclasses will have no need to
 * override this method, relying solely on the {@link #initialValue}
 * method to set the values of thread-locals.
 *
 * @param value the value to be stored in the current thread's copy of
 *        this thread-local.
 */
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

步骤:
1.获取当前线程的成员变量map
2.map非空,则重新将ThreadLocal和新的value副本放入到map中。
3.map空,则对线程的成员变量ThreadLocalMap进行初始化创建,并将ThreadLocal和value副本放入map中。

remove()方法

/**
 * Removes the current thread's value for this thread-local
 * variable.  If this thread-local variable is subsequently
 * {@linkplain #get read} by the current thread, its value will be
 * reinitialized by invoking its {@link #initialValue} method,
 * unless its value is {@linkplain #set set} by the current thread
 * in the interim.  This may result in multiple invocations of the
 * <tt>initialValue</tt> method in the current thread.
 *
 * @since 1.5
 */
public void remove() {
 ThreadLocalMap m = getMap(Thread.currentThread());
 if (m != null)
     m.remove(this);
}

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

remove方法比较简单,不做赘述。

ThreadLocalMap

ThreadLocalMap是ThreadLocal的内部类,没有实现Map接口,用独立的方式实现了Map的功能,其内部的Entry也独立实现。

在ThreadLocalMap中,也是用Entry来保存K-V结构数据的。但是Entry中key只能是ThreadLocal对象,这点被Entry的构造方法已经限定死了。

static class Entry extends WeakReference<ThreadLocal> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal k, Object v) {
        super(k);
        value = v;
    }
}

Entry继承自WeakReference(弱引用,生命周期只能存活到下次GC前),但只有Key是弱引用类型的,Value并非弱引用。

ThreadLocalMap的成员变量:

static class ThreadLocalMap {
    /**
     * The initial capacity -- MUST be a power of two.
     */
    private static final int INITIAL_CAPACITY = 16;

    /**
     * The table, resized as necessary.
     * table.length MUST always be a power of two.
     */
    private Entry[] table;

    /**
     * The number of entries in the table.
     */
    private int size = 0;

    /**
     * The next size value at which to resize.
     */
    private int threshold; // Default to 0
}

Hash冲突怎么解决

ThreadLocalMap和HashMap的最大的不同在于,ThreadLocalMap结构非常简单,没有next引用,也就是说ThreadLocalMap中解决Hash冲突的方式并非链表的方式,而是采用线性探测的方式,所谓线性探测,就是根据初始key的hashcode值确定元素在table数组中的位置,如果发现这个位置上已经有其他key值的元素被占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

ThreadLocalMap解决Hash冲突的方式就是简单的步长加1或减1,寻找下一个相邻的位置。

/**
 * Increment i modulo len.
 */
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

/**
 * Decrement i modulo len.
 */
private static int prevIndex(int i, int len) {
    return ((i - 1 >= 0) ? i - 1 : len - 1);
}

显然ThreadLocalMap采用线性探测的方式解决Hash冲突的效率很低,如果有大量不同的ThreadLocal对象放入map中时发送冲突,或者发生二次冲突,则效率很低。

建议:每个线程只存一个变量,这样的话所有的线程存放到map中的Key都是相同的ThreadLocal,如果一个线程要保存多个变量,就需要创建多个ThreadLocal,多个ThreadLocal放入Map中时会极大的增加Hash冲突的可能。

ThreadLocalMap的问题

由于ThreadLocalMap的key是弱引用,而Value是强引用。这就导致了一个问题,ThreadLocal在没有外部对象强引用时,发生GC时弱引用Key会被回收,而Value不会回收,如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,发生内存泄露。

如何避免泄漏
既然Key是弱引用,那么我们要做的事,就是在调用ThreadLocal的get()、set()方法时完成后再调用remove方法,将Entry节点和Map的引用关系移除,这样整个Entry对象在GC Roots分析后就变成不可达了,下次GC的时候就可以被回收。

如果使用ThreadLocal的set方法之后,没有显示的调用remove方法,就有可能发生内存泄露,所以养成良好的编程习惯十分重要,使用完ThreadLocal之后,记得调用remove方法。

ThreadLocal<Session> threadLocal = new ThreadLocal<Session>();
try {
    threadLocal.set(new Session(1, "徐叔科技"));
    // 其它业务逻辑
} finally {
    threadLocal.remove();
}

应用场景

还记得Hibernate的session获取场景吗?

private static final ThreadLocal<Session> threadLocal = new ThreadLocal<Session>();

//获取Session
public static Session getCurrentSession(){
    Session session =  threadLocal.get();
    //判断Session是否为空,如果为空,将创建一个session,并设置到本地线程变量中
    try {
        if(session == null && !session.isOpen()){
            if(sessionFactory == null){
                rbuildSessionFactory();// 创建Hibernate的SessionFactory
            }else{
                session = sessionFactory.openSession();
            }
        }
        threadLocal.set(session);
    } catch (Exception e) {
        // TODO: handle exception
    }

    return session;
}

为什么?每个线程访问数据库都应当是一个独立的Session会话,如果多个线程共享同一个Session会话,有可能其他线程关闭连接了,当前线程再执行提交时就会出现会话已关闭的异常,导致系统异常。此方式能避免线程争抢Session,提高并发下的安全性。

使用ThreadLocal的典型场景正如上面的数据库连接管理,线程会话管理等场景,只适用于独立变量副本的情况,如果变量为全局共享的,则不适用在高并发下使用。

总结

  • 每个ThreadLocal只能保存一个变量副本,如果想要上线一个线程能够保存多个副本以上,就需要创建多个ThreadLocal。
  • ThreadLocal内部的ThreadLocalMap键为弱引用,会有内存泄漏的风险。
  • 适用于无状态,副本变量独立后不影响业务逻辑的高并发场景。如果如果业务逻辑强依赖于副本变量,则不适合用ThreadLocal解决,需要另寻解决方案。

IPv4地址转Int之Java实现

讲个故事

面试官:IPv4地址可以转为Long类型的数字知道吧?你写一下这个转换的代码!

对计算机基础逐渐模糊的小徐一脸懵逼,毕竟工作中很少会用到,只记得IP地址和整数是可以相互转换的,但是从来没有自己实现过。于是在大脑中飞速计算。过了一会,思路出现了:IP地址分为四段,每段都是0~255之间的数,每段可以用8位来装下它,4×8=32位,也就是可以将IP地址转为32位的整数。咦?面试官居然让转成Long,但是Long有64位啊!一个int就搞定了,为什么要转为Long呢?但是自己没实现过,心里没底,再一犯嘀咕,然后回答不出来。面试没通过!!!

之后小徐一直心心念念这个问题,回来立马自己写了代码实现了一下,果然是int就搞定了,之前的思路一点都没错!!!

面试有时考验的不只是技术,还有自信心!!!

 

IP地址是一个32位的二进制数,通常被分割为4个“8位二进制数”(也就是4个字节)。IP地址通常用“点分十进制”表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之间的十进制整数。

例:点分十进IP地址(100.4.5.6),实际上是32位二进制数(01100100.00000100.00000101.00000110)。

 

为什么8位能存储0~255之间的数字?

一个字节8个位,每个位就只有0跟1两种情况,8个位能表示2的8次方即256,范围0-255(带负值的话范围在:-128~127);

8位最小二进制为00000000转十进制即为0

8位最大二进制为11111111转十进制即为255

所以8位可存储0~255之间的数

IP地址本身就是一个32位的二进制数,只是通常被以a.b.c.d的形式表示而已。

那么为什么要将IP转为数字呢?

其实就是时间换空间的一种方式。

String类型的IP占用7个字节到15个字节0.0.0.0~255.255.255.255,而int只需要4个字节。

IP字符串转换为Int

/**
 * 将 ip 字符串转换为 int 类型的数字
 * <p>
 * 思路就是将 ip 的每一段数字转为 8 位二进制数,并将它们放在结果的适当位置上
 *
 * @param ipString ip字符串,如 127.0.0.1
 * @return ip字符串对应的 int 值
 */
public static int ip2Int(String ipString) {
    // 取 ip 的各段
    String[] ipSlices = ipString.split("\\.");
    int rs = 0;
    for (int i = 0; i < ipSlices.length; i++) {
        // 将 ip 的每一段解析为 int,并根据位置左移 8 位
        int intSlice = Integer.parseInt(ipSlices[i]) << 8 * i;
        // 或运算
        rs = rs | intSlice;
    }
    return rs;
}

有一个技巧,就是 或运算。就是将每段的 int 值左移到恰当的位置后跟保存结果的 int 值进行或运算。

255.255.255.255 这个地址为例,上面的或运算过程如下:

00000000 00000000 00000000 00000000
00000000 00000000 00000000 11111111
------------或运算------------
00000000 00000000 00000000 11111111
00000000 00000000 11111111 00000000
------------或运算------------
00000000 00000000 11111111 11111111
00000000 11111111 00000000 00000000
------------或运算------------
00000000 11111111 11111111 11111111
11111111 00000000 00000000 00000000
-----------最终结果------------
11111111 11111111 11111111 11111111

那么如何将 int 再转为字符串的表示法呢?

Int转换为IP字符串

思路是一样的,将 int 值的 32 位分为 4 个 8 位数字,然后这 4 个 8 位的数字用 0~255 的数字进行表示,用点号分隔即可。我们也基于位运算,7行代码即可实现:

/**
 * 将 int 转换为 ip 字符串
 *
 * @param ipInt 用 int 表示的 ip 值
 * @return ip字符串,如 127.0.0.1
 */
public static String int2Ip(int ipInt) {
    String[] ipString = new String[4];
    for (int i = 0; i < 4; i++) {
        // 每 8 位为一段,这里取当前要处理的最高位的位置
        int pos = i * 8;
        // 取当前处理的 ip 段的值
        int and = ipInt & (255 << pos);
        // 将当前 ip 段转换为 0 ~ 255 的数字,注意这里必须使用无符号右移
        ipString[i] = String.valueOf(and >>> pos);
    }
    return String.join(".", ipString);
}

这里使用与运算来取每次处理的 ip 片段。取最高的 8 位时,涉及到符号的处理,因此在将每段 8 位转为 0~255 的数字时必须使用无符号右移运算,否则最后处理的部分因为符号问题会不准确。

测试一下

public static void main(String[] args) {
    String[] ips4Test = new String[]{"0.0.0.0", "127.0.0.1", 
    	"192.168.1.1", "255.0.0.255", "255.255.255.255"};
    for (String ip : ips4Test) {
        test(ip);
    }
}

public static void test(String ip) {
    int ipInt = ip2Int(ip);
    String ipString = int2Ip(ipInt);
    System.out.println("用于测试的ip地址: " + ip + ", int表示: " + ipInt + ", 二进制: " + Long.toBinaryString(ipInt)
            + ", 转回String: " + ipString + ",与测试 ip 地址是否相等: " + ip.equals(ipString));
}

打印结果:

用于测试的ip地址: 0.0.0.0, int表示: 0, 二进制: 0, 转回String: 0.0.0.0,与测试 ip 地址是否相等: true
用于测试的ip地址: 127.0.0.1, int表示: 16777343, 二进制: 1000000000000000001111111, 转回String: 127.0.0.1,与测试 ip 地址是否相等: true
用于测试的ip地址: 192.168.1.1, int表示: 16885952, 二进制: 1000000011010100011000000, 转回String: 192.168.1.1,与测试 ip 地址是否相等: true
用于测试的ip地址: 255.0.0.255, int表示: -16776961, 二进制: 1111111111111111111111111111111111111111000000000000000011111111, 转回String: 255.0.0.255,与测试 ip 地址是否相等: true
用于测试的ip地址: 255.255.255.255, int表示: -1, 二进制: 1111111111111111111111111111111111111111111111111111111111111111, 转回String: 255.255.255.255,与测试 ip 地址是否相等: true

注意:这里相互转换的算法是配套的,不同的转换算法计算的 int 值可能会不一样,因为虽然都是处理 ip 的 4 个部分,但是它们的结合顺序可以不一样,因此以怎样的顺序搭配转为 int,就应该以相同的顺序解析为 String。

Spring事务隔离级别和传播特性

传播行为

事务的第一个方面是传播行为。传播行为定义关于客户端和被调用方法的事务边界。Spring定义了7中传播行为。如下表所示:

传播行为意义
PROPAGATION_MANDATORY表示该方法必须运行在一个事务中。如果当前没有事务正在发生,将抛出一个异常
PROPAGATION_NESTED表示如果当前正有一个事务在进行中,则该方法应当运行在一个嵌套式事务中。被嵌套的事务可以独立于封装事务进行提交或回滚。如果封装事务不存在,行为就像PROPAGATION_REQUIRES一样。
PROPAGATION_NEVER表示当前的方法不应该在一个事务中运行。如果一个事务正在进行,则会抛出一个异常。
PROPAGATION_NOT_SUPPORTED表示该方法不应该在一个事务中运行。如果一个现有事务正在进行中,它将在该方法的运行期间被挂起。
PROPAGATION_SUPPORTS表示当前方法不需要事务性上下文,但是如果有一个事务已经在运行的话,它也可以在这个事务里运行。
PROPAGATION_REQUIRES_NEW表示当前方法必须在它自己的事务里运行。一个新的事务将被启动,而且如果有一个现有事务在运行的话,则将在这个方法运行期间被挂起。
PROPAGATION_REQUIRES表示当前方法必须在一个事务中运行。如果一个现有事务正在进行中,该方法将在那个事务中运行,否则就要开始一个新事务。

传播规则回答了这样一个问题,就是一个新的事务应该被启动还是被挂起,或者是一个方法是否应该在事务性上下文中运行。

隔离级别

声明式事务的第二个方面是隔离级别。隔离级别定义一个事务可能受其他并发事务活动活动影响的程度。另一种考虑一个事务的隔离级别的方式,是把它想象为那个事务对于事物处理数据的自私程度。

在一个典型的应用程序中,多个事务同时运行,经常会为了完成他们的工作而操作同一个数据。并发虽然是必需的,但是会导致一下问题:

  • 脏读(Dirty read)– 脏读发生在一个事务读取了被另一个事务改写但尚未提交的数据时。如果这些改变在稍后被回滚了,那么第一个事务读取的数据就会是无效的。
  • 不可重复读(Nonrepeatable read)– 不可重复读发生在一个事务执行相同的查询两次或两次以上,但每次查询结果都不相同时。这通常是由于另一个并发事务在两次查询之间更新了数据。
  • 幻影读(Phantom reads)– 幻影读和不可重复读相似。当一个事务(T1)读取几行记录后,另一个并发事务(T2)插入了一些记录时,幻影读就发生了。在后来的查询中,第一个事务(T1)就会发现一些原来没有的额外记录。

在理想状态下,事务之间将完全隔离,从而可以防止这些问题发生。然而,完全隔离会影响性能,因为隔离经常牵扯到锁定在数据库中的记录(而且有时是锁定完整的数据表)。侵占性的锁定会阻碍并发,要求事务相互等待来完成工作。

考虑到完全隔离会影响性能,而且并不是所有应用程序都要求完全隔离,所以有时可以在事务隔离方面灵活处理。因此,就会有好几个隔离级别。

隔离级别含义
ISOLATION_DEFAULT使用后端数据库默认的隔离级别。
ISOLATION_READ_UNCOMMITTED允许读取尚未提交的更改。可能导致脏读、幻影读或不可重复读。
ISOLATION_READ_COMMITTED允许从已经提交的并发事务读取。可防止脏读,但幻影读和不可重复读仍可能会发生。
ISOLATION_REPEATABLE_READ对相同字段的多次读取的结果是一致的,除非数据被当前事务本身改变。可防止脏读和不可重复读,但幻影读仍可能发生。
ISOLATION_SERIALIZABLE完全服从ACID的隔离级别,确保不发生脏读、不可重复读和幻影读。这在所有隔离级别中也是最慢的,因为它通常是通过完全锁定当前事务所涉及的数据表来完成的。

只读(Read Only)

声明式事务的第三个特性是它是否是一个只读事务。如果一个事务只对后端数据库执行读操作,那么该数据库就可能利用那个事务的只读特性,采取某些优化 措施。通过把一个事务声明为只读,可以给后端数据库一个机会来应用那些它认为合适的优化措施。由于只读的优化措施是在一个事务启动时由后端数据库实施的, 因此,只有对于那些具有可能启动一个新事务的传播行为(PROPAGATION_REQUIRES_NEW、PROPAGATION_REQUIRED、 ROPAGATION_NESTED)的方法来说,将事务声明为只读才有意义。

此外,如果使用Hibernate作为持久化机制,那么把一个事务声明为只读,将使Hibernate的flush模式被设置为FLUSH_NEVER。这就告诉Hibernate避免和数据库进行不必要的对象同步,从而把所有更新延迟到事务的结束。

事务超时(Timeout)

为了使一个应用程序很好地执行,它的事务不能运行太长时间。因此,声明式事务的下一个特性就是它的超时。

假设事务的运行时间变得格外的长,由于事务可能涉及对后端数据库的锁定,所以长时间运行的事务会不必要地占用数据库资源。这时就可以声明一个事务在特定秒数后自动回滚,不必等它自己结束。

由于超时时钟在一个事务启动的时候开始的,因此,只有对于那些具有可能启动一个新事务的传播行为(PROPAGATION_REQUIRES_NEW、PROPAGATION_REQUIRED、ROPAGATION_NESTED)的方法来说,声明事务超时才有意义。

回滚(Rollback)

在默认设置下,事务只在出现运行时异常(runtime exception)时回滚,而在出现受检查异常(checked exception)时不回滚(这一行为和EJB中的回滚行为是一致的)。

不过,也可以声明在出现特定受检查异常时像运行时异常一样回滚。同样,也可以声明一个事务在出现特定的异常时不回滚,即使那些异常是运行时一场。

理解脏读、不可重复读、幻读

脏读

脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。

不可重复读

不可重复读是指在一个事务内,多次读同一数据。在这个事务还没有结束时,另外一个事务也访问该同一数据。那么,在第一个事务中的两 次读数据之间,由于第二个事务的修改,那么第一个事务两次读到的的数据可能是不一样的。这样就发生了在一个事务内两次读到的数据是不一样的,因此称为是不 可重复读。

例如,一个编辑人员两次读取同一文档,但在两次读取之间,作者重写了该文档。当编辑人员第二次读取文档时,文档已更改。原始读取不可重复。如果 只有在作者全部完成编写后编辑人员才可以读取文档,则可以避免该问题。

幻读

幻读是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,这种修改涉及到表中的全部数据行。 同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象发生了幻觉一样。

例如,一个编辑人员更改作者提交的文档,但当生产部门将其更改内容合并到该文档的主复本时,发现作者已将未编辑的新材料添加到该文档中。 如果在编辑人员和生产部门完成对原始文档的处理之前,任何人都不能将新材料添加到文档中,则可以避免该问题。

基于元数据的 Spring 声明性事务 :

隔离属性一共支持五种事务设置,具体介绍如下:

  1. DEFAULT 使用数据库设置的隔离级别(默认) ,由 DBA 默认的设置来决定隔离级别 。
  2. READ_UNCOMMITTED 会出现脏读、不可重复读、幻读(隔离级别最低,并发性能高)。
  3. READ_COMMITTED  会出现不可重复读、幻读问题(锁定正在读取的行)。
  4. REPEATABLE_READ 会出幻读(锁定所读取的所有行)。
  5. SERIALIZABLE 保证所有的情况不会发生(锁表)。

 

不可重复读的重点是修改

同样的条件,读取过的数据,再次读取出来发现值不一样了。

幻读的重点在于新增或者删除

同样的条件,第1次和第2次读出来的记录数不一样。

Java生产者消费者模型实践五

回顾前面做一些实验,你会发现,实现一的并发性能高于实现二、三。暂且不关心BlockingQueue的具体实现,来分析看如何优化实现三(与实现二的思路相同,性能相当)的性能。

分析实现三的瓶颈

最好的查证方法是记录方法执行时间,这样可以直接定位到真正的瓶颈。但此问题较简单,我们直接用“瞪眼法”分析。

实现三的并发瓶颈很明显,因为在锁 BUFFER_LOCK 看来,任何消费者线程与生产者线程都是一样的。换句话说,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)操作缓冲区 buffer。

而实际上,如果缓冲区是一个队列的话,“生产者将产品入队”与“消费者将产品出队”两个操作之间没有同步关系,可以在队首出队的同时,在队尾入队。理想性能可提升至实现三的两倍

去掉这个瓶颈

那么思路就简单了:需要两个锁 CONSUME_LOCKPRODUCE_LOCKCONSUME_LOCK控制消费者线程并发出队,PRODUCE_LOCK控制生产者线程并发入队;相应需要两个条件变量NOT_EMPTYNOT_FULLNOT_EMPTY负责控制消费者线程的状态(阻塞、运行),NOT_FULL负责控制生产者线程的状态(阻塞、运行)。以此让优化消费者与消费者(或生产者与生产者)之间是串行的;消费者与生产者之间是并行的。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 基于锁条件实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/12
 */
public class LockConditionModel2 implements Model {

    private final Lock CONSUME_LOCK = new ReentrantLock();

    private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();

    private final Lock PRODUCE_LOCK = new ReentrantLock();

    private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();

    private final Buffer<Task> buffer = new Buffer<>();

    private AtomicInteger bufLen = new AtomicInteger(0);

    private final int cap;

    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public LockConditionModel2(int cap) {
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {
        @Override
        public void consume() throws InterruptedException {
            int newBufSize = -1;
            CONSUME_LOCK.lockInterruptibly();
            try {
                while (bufLen.get() == 0) {
                    System.out.println("buffer is empty...");
                    NOT_EMPTY.await();
                }

                Task task = buffer.poll();

                assert task != null;
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                Thread.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.no);

                newBufSize = bufLen.decrementAndGet();
                if (newBufSize > 0) {
                    // 非空唤醒继续消费
                    NOT_EMPTY.signalAll();
                }
            } finally {
                CONSUME_LOCK.unlock();
            }
            if (newBufSize < cap) {
                PRODUCE_LOCK.lockInterruptibly();
                try {
                    // 未满唤醒生产者继续生产
                    NOT_FULL.signalAll();
                } finally {
                    PRODUCE_LOCK.unlock();
                }
            }
        }
    }


    private class ProducerImpl extends AbsProducer {
        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            int newBufSize = -1;
            PRODUCE_LOCK.lockInterruptibly();
            try {
                while (bufLen.get() == cap) {
                    System.out.println("buffer is full...");
                    NOT_FULL.await();
                }
                Task task = new Task(increTaskNo.getAndIncrement());
                buffer.offer(task);
                newBufSize = bufLen.incrementAndGet();
                System.out.println("produce: " + task.no);
                if (newBufSize < cap) {
                    // 未满唤醒继续消费
                    NOT_FULL.signalAll();
                }
            } finally {
                PRODUCE_LOCK.unlock();
            }
            if (newBufSize > 0) {
                CONSUME_LOCK.lockInterruptibly();
                try {
                    // 非空唤醒消费者继续消费
                    NOT_EMPTY.signalAll();
                } finally {
                    CONSUME_LOCK.unlock();
                }
            }
        }
    }

    private static class Buffer<E> {
        private Node head;
        private Node tail;

        Buffer() {
            // dummy node
            head = tail = new Node(null);
        }

        public void offer(E e) {
            tail.next = new Node(e);
            tail = tail.next;
        }

        public E poll() {
            head = head.next;
            E e = head.item;
            head.item = null;
            return e;
        }

        private class Node {
            E item;
            Node next;

            Node(E item) {
                this.item = item;
            }
        }
    }

    public static void main(String[] args) {
        Model model = new LockConditionModel2(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

需要注意的是,由于需要同时在UnThreadSafe的缓冲区 buffer 上进行消费与生产,我们不能使用实现二、三中使用的队列了,需要自己实现一个简单的缓冲区 Buffer。Buffer要满足以下条件:

  • 在头部出队,尾部入队
  • 在poll()方法中只操作head
  • 在offer()方法中只操作tail

还能进一步优化吗

我们已经优化掉了消费者与生产者之间的瓶颈,还能进一步优化吗?

如果可以,必然是继续优化消费者与消费者(或生产者与生产者)之间的并发性能。然而,消费者与消费者之间必须是串行的,因此,并发模型上已经没有地方可以继续优化了。

仔细分析下,实现四中的signalAll都可以换成signal。这里为了屏蔽复杂性,回避了这个优化。

不过在具体的业务场景中,一般还能够继续优化。如:

  • 并发规模中等,可考虑使用CAS代替重入锁
  • 模型上不能优化,但一个消费行为或许可以进一步拆解、优化,从而降低消费的延迟
  • 一个队列的并发性能达到了极限,可采用“多个队列”(如分布式消息队列等)

这4种写法的本质相同——都是在使用或实现BlockingQueue。实现一直接使用BlockingQueue,实现四实现了简单的BlockingQueue,而实现二、三则实现了退化版的BlockingQueue(性能降低一半)。

Java生产者消费者模型实践四

我们要保证理解wait && notify机制。实现时可以使用Object类提供的wait()方法与notifyAll()方法,但更推荐的方式是使用java.util.concurrent包提供的Lock && Condition

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 基于锁条件实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/12
 */
public class LockConditionModel1 implements Model {

    private final Lock BUFFER_LOCK = new ReentrantLock();

    private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();

    private final Queue<Task> buffer = new LinkedList<>();

    private final int cap;

    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public LockConditionModel1(int cap) {
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {
        @Override
        public void consume() throws InterruptedException {
            BUFFER_LOCK.lockInterruptibly();
            try {
                while (buffer.size() == 0) {
                    // 当缓冲中元素为空则阻塞该线程与此同时会唤醒生产者线程进行生产
                    BUFFER_COND.await();
                }
                Task task = buffer.poll();
                assert task != null;
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                Thread.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.no);
                // 消费后唤醒生产者线程进行生产
                BUFFER_COND.signalAll();
            } finally {
                BUFFER_LOCK.unlock();
            }
        }
    }

    private class ProducerImpl extends AbsProducer {
        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            BUFFER_LOCK.lockInterruptibly();
            try {
                while (buffer.size() == cap) {
                    // 当缓冲中元素已满则阻塞该线程与此同时会唤醒消费者线程进行消费
                    BUFFER_COND.await();
                }
                Task task = new Task(increTaskNo.getAndIncrement());
                buffer.offer(task);
                System.out.println("produce: " + task.no);
                // 生产后唤醒消费者线程进行消费
                BUFFER_COND.signalAll();
            } finally {
                BUFFER_LOCK.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Model model = new LockConditionModel1(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

该写法的思路与实现二的思路完全相同,仅仅将锁与条件变量换成了Lock和Condition。

Java生产者消费者模型实践三

如果不能将并发与容量控制都封装在缓冲区中,就只能由消费者与生产者完成。最简单的方案是使用朴素的wait && notify机制。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于对象锁wait和notify实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class WaitNotifyModel implements Model {

    private final Object BUFFER_LOCK = new Object();

    private final Queue<Task> buffer = new LinkedList<>();

    private final int cap;

    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public WaitNotifyModel(int cap) {
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {
        @Override
        public void consume() throws InterruptedException {
            synchronized (BUFFER_LOCK) {
                while (buffer.size() == 0) {
                    // 当缓冲中元素为空则阻塞该线程与此同时会唤醒生产者线程进行生产
                    BUFFER_LOCK.wait();
                }
                Task task = buffer.poll();
                assert task != null;
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                Thread.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.no);
                // 消费后唤醒生产者线程进行生产
                BUFFER_LOCK.notifyAll();
            }
        }
    }

    private class ProducerImpl extends AbsProducer {
        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            synchronized (BUFFER_LOCK) {
                while (buffer.size() == cap) {
                    // 当缓冲中元素已满则阻塞该线程与此同时会唤醒消费者线程进行消费
                    BUFFER_LOCK.wait();
                }
                Task task = new Task(increTaskNo.getAndIncrement());
                buffer.offer(task);
                System.out.println("produce: " + task.no);
                // 生产后唤醒消费者线程进行消费
                BUFFER_LOCK.notifyAll();
            }
        }
    }

    public static void main(String[] args) {
        Model model = new WaitNotifyModel(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

Java生产者消费者模型实践二

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。而BlockingQueue的性质天生满足这个要求。

package com.github.xuchengen.concurrent.impl;

import com.github.xuchengen.concurrent.AbsConsumer;
import com.github.xuchengen.concurrent.AbsProducer;
import com.github.xuchengen.concurrent.Model;
import com.github.xuchengen.concurrent.Task;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于阻塞队列的缓冲实现
 * 作者:徐承恩
 * 邮箱:xuchengen@gmail.com
 * 日期:2019/12/11
 */
public class BlockingQueueModel implements Model {

    private final BlockingQueue<Task> queue;

    private final AtomicInteger increTaskNo = new AtomicInteger();

    public BlockingQueueModel(int cap) {
        this.queue = new LinkedBlockingQueue<>(cap);
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbsConsumer {

        @Override
        public void consume() throws InterruptedException {
            Task task = queue.take();
            // 固定时间范围的消费,模拟相对稳定的服务器处理过程
            Thread.sleep(500 + (long) (Math.random() * 500));
            System.out.println("consume: " + task.no);
        }
    }

    private class ProducerImpl extends AbsProducer {

        @Override
        public void produce() throws InterruptedException {
            // 不定期生产,模拟随机的用户请求
            Thread.sleep((long) (Math.random() * 1000));
            Task task = new Task(increTaskNo.getAndIncrement());
            System.out.println("produce: " + task.no);
            queue.put(task);
        }
    }

    public static void main(String[] args) {

        Model model = new BlockingQueueModel(3);

        //2个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }

        //5个生产者
        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }

}

由于操作“出队/入队+日志输出”不是原子的,所以上述日志的绝对顺序与实际的出队/入队顺序有出入,但对于同一个任务号task.no,其consume日志一定出现在其produce日志之后,即:同一任务的消费行为一定发生在生产行为之后。缓冲区的容量留给读者验证。符合两个验证条件。

BlockingQueue写法的核心只有两行代码,并发和容量控制都封装在了BlockingQueue中,正确性由BlockingQueue保证。面试中首选该写法,自然美观简单。