分类目录归档:JAVA

基于延迟队列打造精准的订单超时关闭

订单的超时取消很多系统采用定时任务,实际上达不到要求。我用的是延迟队列,但缺点是只实现了基于jvm的,分布式采用的是修改之前去查询订单状态,以及分布式锁获取的方式来控制,这样获得锁的,先去查订单是否已经取消掉,如果没有,就改成去掉。但这种方式虽然效果不错,我对这个半吊子的思路还是不满意的,后面有时间再优化一些,做成分布式的。

整个功能实际就是利用延迟队列的特性。延迟队列有个时间属性,一旦到达这个时间节点,会立即从队列中弹出任务,这时候,轮询守护线程就可以执行这个任务了。如果没有到达这个时间节点,无论轮询守护线程怎么轮询,都查不到对应的任务弹出。从jdk1.5开始,java就提供了Delayed接口,用于延迟队列。该接口是继承于Comparable<T>的。至于延迟队列里的任务是什么样子,则由自己定义,我为了便于多线程跑,使用了继承了Runnable的任务,所以定义成这个样子。

执行流程:

  1. 系统启动,初始化延迟队列。随即会启动守护线程,用于轮询延迟队列里的任务到期弹出。
  2. 将现有数据库中处于待支付状态的订单插入延迟队列。
  3. 一旦有新的订单加入,则放入延迟队列中。
  4. 一旦有任务到期,则守护线程会接到弹出的任务,并执行该任务。
  5. 该任务会从更新数据库中的订单状态。
  6. 一旦有用户取消订单/支付订单,则从延迟队列中查询到该订单的延迟任务,手动删除该任务。

这个是使用的spring boot的线程池的,所以需要配置一个线程池。

延迟队列代码:

package com.xxxx.xxxx.config.taskThreadPool;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 延迟队列中放入的task,里面包含到时间之后要处理的任务,该任务类实现了Runnable,用于交给线程池管理
 */
public class DelayCancelOrderTask<T extends Runnable> implements Delayed {

   /**
    * 到期时间
    */
   private final long            time;

   /**
    * 任务对象
    */
   private final T processor;
   /**
    * 原子类
    */
   private static final AtomicLong atomic = new AtomicLong(0);

   private final long sequence;

   public DelayCancelOrderTask(long timeout, T processor){
      this.time = System.nanoTime() + timeout;
      this.processor = processor;
      this.sequence = atomic.getAndIncrement();
   }
   @Override
   public long getDelay(TimeUnit unit) {
      return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
   }

   @Override
   public int compareTo(Delayed o) {
      if (o == this){
         return 0;
      }
      if (o instanceof DelayCancelOrderTask){
         DelayCancelOrderTask<?> other = (DelayCancelOrderTask<?>)o;
         long diff = this.time - other.time;
         if (diff > 0) {
            return 1;
         } else if (diff < 0) {
            return -1;
         }else if(this.sequence < other.sequence){
            return -1;
         } else {
            return 0;
         }
      }
      long diffrent = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
      return (diffrent == 0)?0:((diffrent < 0) ? -1 : 1);
   }
   public T getProcessor(){
      return  this.processor;
   }
   @Override
   public int hashCode(){
      return processor.hashCode();
   }
   @Override
   public boolean equals(Object object)
   {
      if (object != null)
      {
         return object.hashCode() == hashCode() ? true : false;
      }
      return false;
   }

}

延迟队列中放置的任务:

package com.xxxx.xxxx.config.taskThreadPool;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

import com.xxxx.xxxx.domain.vo.MonitorOrderVO;
import com.xxxx.xxxx.enums.EnumWorkOrderSource;
import com.xxxx.xxxx.utils.MessageProducer;
import org.springframework.scheduling.annotation.Async;

import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.enums.EnumB2BOrderStatus;
import com.xxxx.xxxx.service.MonitorOrderService;
import com.xxxx.xxxx.utils.BeanFactoryGetter;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 超时取消的时间到了,这里要执行对订单的具体取消操作
 */
public class PayTimeoutCancelOrderProcessor implements Runnable {
   private String b2bTaskNo;
   public PayTimeoutCancelOrderProcessor(String b2bTaskNo){
      this.b2bTaskNo = b2bTaskNo;
   }
   @Override
   @Async
   public void run(){
      //因service无法注入,只能从bean工厂中拉取
      MonitorOrderService monitorOrderService = (MonitorOrderService)BeanFactoryGetter.getBean("monitorOrderService");
      MonitorOrder monitorOrder = new MonitorOrder();
      monitorOrder.setB2bTaskNo(b2bTaskNo);
      monitorOrder.setCurrentStatus(EnumB2BOrderStatus.OVERTIME_PAYMENT.getKey());
      monitorOrder.setCurrentStatusTime(LocalDateTime.now());
      monitorOrderService.uptCurrentStatusByTaskNoForOverTimeCancel(monitorOrder);
      //推送直接用户订单消息。现在还没有推送系统正式上线,所以先暂时注释掉
   }
}

延迟队列的管理类,有轮询守护线程:

package com.xxxx.xxxx.config.taskThreadPool;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 超时取消的管理类,管理延迟队列,放出一个线程轮询监控延迟队列;有一些方法用来往延迟队列中压入task,有方法用来从队列中remove task。还有初始化方法,开始执行轮询监控线程
 */
@Component
public class DelayCancelOrderTaskManager {
   private static final Logger logger = LoggerFactory.getLogger(DelayCancelOrderTaskManager.class);
   //延迟队列,一切都是围绕它来运转的
   private DelayQueue<DelayCancelOrderTask<?>> delayQueue;
   //超时取消的管理类是个单例,因为系统启动要有方法往队列中插入延迟task,所以搞成饿汉模式
   private static DelayCancelOrderTaskManager instance = new DelayCancelOrderTaskManager();
   // 守护线程,用于轮询延时队列
   private Thread               daemonThread;
   //该方法初始管理类时调用,初始化延迟队列,同时初始化轮询线程
   private DelayCancelOrderTaskManager(){
      delayQueue = new DelayQueue<DelayCancelOrderTask<?>>();
      this.init();
   }
   public static DelayCancelOrderTaskManager getInstance(){
      return instance;
   }
   //初始化轮询监控守护线程
   public void init(){
      //lambda表达式,->的意思其实是静态方法,初始化随即执行的。
      daemonThread = new Thread(()-> {
         try{
            System.out.println("daemonThread start");
            execute();
         }catch (Exception e){
            logger.error("轮询线程出错",e);
         }
      });
      daemonThread.setName("DelayQueueMonitorThread");
      daemonThread.start();
   }
   //初始化的时候开始执行
   private void execute(){
      //不断轮询
      while(true){
         //此处仅为打印日志方便
         Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
         logger.info("线程数--------------" + map.size());
         logger.info(System.currentTimeMillis()+" 队列中的个数:"+delayQueue.size());
         try{
            //从队列中取出可以取出的task。延时队列有个特点,不到时间取不出来,所以能取出来的,都是到时间即将执行的。
            DelayCancelOrderTask<?> delayCancelOrderTask = delayQueue.take();
            //task不为空,则开始执行
            if(delayCancelOrderTask != null){
               //获取task里面的处理线程,该线程会丢到线程池中处理
               Runnable payTimeoutCancelOrderProcessor = delayCancelOrderTask.getProcessor();
               if(payTimeoutCancelOrderProcessor == null){
                  continue;
               }
               //线程执行
               payTimeoutCancelOrderProcessor.run();
               //执行完毕,从队列中删除task
               this.removeTask(delayCancelOrderTask);
            }
         }catch (Exception e){
            logger.error("线程执行错误:",e);
         }
      }
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间为以秒为单位
    */
   public void putTaskInSeconds(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.SECONDS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间以为分钟为单位
    */
   public void putTaskInMinites(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.MINUTES);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }
   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间以小时为单位
    */
   public void putTaskInHours(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.HOURS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }
   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间为自定义的单位
    */
   public void putTaskInOwnDefine(Runnable task,long timeoutPeriod,TimeUnit unit){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,unit);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的时间为超时时间点
    */
   public void putTaskInTimeoutTime(Runnable task,LocalDateTime timeoutTime){
      Duration duration = Duration.between(LocalDateTime.now(),timeoutTime);
      long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(),TimeUnit.NANOSECONDS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/13 15:43
    * @Param
    * @Description 从队列中删除某个task。一般在用户自己取消订单的时候执行
    */
   public boolean removeTask(DelayCancelOrderTask<? extends Runnable> task){
      return delayQueue.remove(task);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/13 15:44
    * @Param
    * @Description 判断队列中是否含有某个task
    */
   public boolean contains(DelayCancelOrderTask<? extends Runnable> task){
      return delayQueue.contains(task);
   }
   //获取队列个数。这个方法专门给打印日志核对数据用的。一般用不着它
   public Integer getDelayQueueSize(){
      System.out.println("队列中的个数:"+delayQueue.size());
      return delayQueue.size();
   }
}

在系统初始化的时候,需要将数据库中已经处于等待超时状态的订单打入延迟队列:

package com.xxxx.xxxx.config.runner;

import com.xxxx.xxxx.config.taskThreadPool.DelayCancelOrderTaskManager;
import com.xxxx.xxxx.config.taskThreadPool.PayTimeoutCancelOrderProcessor;
import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.service.MonitorOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 系统初始化时将处于待支付状态的单据丢到延时队列中,等待执行超时任务
 */

@Component
public class OrderPayTimeoutQueueRunner  implements CommandLineRunner {
   @Autowired
   private MonitorOrderService monitorOrderService;
   @Override
   public void run(String... strings) throws Exception {
      DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
      List<MonitorOrder> monitorOrderList = monitorOrderService.getPayNotTimeoutOrderList();
      for(MonitorOrder monitorOrder : monitorOrderList){
         PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(monitorOrder.getB2bTaskNo());
         delayCancelOrderTaskManager.putTaskInTimeoutTime(processor,monitorOrder.getOrderPayTimeout());
      }
   }
}

如果用户取消订单/中间执行支付操作后,则从延迟队列中去掉任务:

//取消预约单,则删除延时队列中的超时等待信号 xiehao start
MonitorOrder targetOrder = monitorOrderMapper.getMonitorOrderByTaskNo(taskNum);
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(targetOrder.getB2bTaskNo());
Duration duration = Duration.between(LocalDateTime.now(), targetOrder.getOrderPayTimeout());
long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout, processor);
delayCancelOrderTaskManager.removeTask(delayCancelOrderTask);
//取消预约单,则删除延时队列中的超时等待信号 xiehao end

如果用户中间下单,则加入延迟队列:

//加入延时队列,等待超时取消 xiehao
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(order.getB2bTaskNo());
delayCancelOrderTaskManager.putTaskInTimeoutTime(processor, order.getOrderPayTimeout());

工具类:

package com.xxxx.xxxx.utils;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

/**
 * @Author: xiehao
 * @Date: 2018-08-04
 */

@Component
public class BeanFactoryGetter implements BeanFactoryAware {

   //Bean工厂必须是static类型,否则系统启动的时候将无法写入factory
   private static BeanFactory factory;

   @Override
   public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
      factory = beanFactory;
   }

   public static Object getBean(String beanName){
      if(StringUtils.isEmpty(beanName)){
         return null;
      }
      Object t= factory.getBean(beanName);
      return t;
   }
}

Linux中下载安装Oracle JDK并解决Tar命令解压缩问题

徐叔相信大多数互联网从业者都有使用Linux服务器,本篇徐叔介绍一下如何在Linux中下载并安装Oracle JDK(甲骨文)。

通常来讲我都是去甲骨文官方网站下载JDK,然而在Linux中我们直接使用curl或者wget命令下载甲骨文提供的JDK会莫名其妙的无法解压或者说解压后无法正常运行,这其中的原因我也不过多解释徐叔就直接贴命令。

下载JDK8

wget --no-cookie --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u171-b11/512cd62ec5174c3487ac17c61aaa89e8/jdk-8u171-linux-x64.tar.gz

重点是前面一段

wget --no-cookie --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie"

解压缩安装包

# 解压安装包
tar -zxvf jdk-8u171-linux-x64.tar.gz
# 新建java文件夹
mkdir /usr/local/src/java
# 复制到java8文件夹中
cp -r jdk1.8.0_171/ /usr/local/src/java/java8

新建环境变量文件

vim /etc/profile.d/java.sh

设置环境变量

# java安装路径
JAVA_HOME=/usr/local/src/java/java8
CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar 
PATH=$JAVA_HOME/bin:$HOME/bin:$HOME/.local/bin:$PATH
source /etc/profile.d/java.sh

OK大功告成,容徐叔抽支闷烟…

Centos7安装ActiveMQ并进行安全配置

Apache ActiveMQApache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

ActiveMQ特色:

继续阅读

浅谈ArrayList的removeAll方法

在开发过程中,遇到一个情况,就是从所有骑手Id中过滤没有标签的骑手Id(直接查询没有标签的骑手不容易实现)。

List<Integer> allRiderIdList = new ArrayList(); // 所有的骑手,大致有23W数据

List<Integer> hasAnyTagRiderId = new ArrayList(); // 有标签的骑手, 大致有21W数据

List<Integer> withoutAnyTagRiderList = allRiderIdList.removeAll(hasAnyTagRiderId);
逻辑很简单,就是取一个差集,这样子就拿到没有任何标签的骑手数据。
但是在实际开发过程中,removeAll这个动作很耗时,做测试大概要4分钟左右。查看ArrayList中removeAll的源码片段:
public boolean removeAll(Collection<?> c) {
    Objects.requireNonNull(c);
    return batchRemove(c, false);
}

private boolean batchRemove(Collection<?> c, boolean complement) {
    final Object[] elementData = this.elementData;
    int r = 0, w = 0;
    boolean modified = false;
    try {
        for (; r < size; r++) // 循环原来的list
            if (c.contains(elementData[r]) complement) // 这里调用contains方法
                elementData[w++] = elementData[r];
    } finally {
        ....
    }
    return modified;
}

在循环过程中调用contains方法做比较,查一下ArrayList的contains方法,源代码片段如下:

public boolean contains(Object o) {
    return indexOf(o) >= 0;
}

public int indexOf(Object o) {
    if (o null) {
        for (int i = 0; i < size; i++)
            if (elementData[i]==null)
                return i;
    } else {
        for (int i = 0; i < size; i++)
            if (o.equals(elementData[i]))
                return i;
    }
    return -1;
}
这可以看出来,在比较的过程中,又调用了一次循环。
所以removeAll两层for循环,复杂度O(m*n),所以在操作比较大的ArrayList时,这种方法是绝对不可取的。
下面看一下最终的实现方式:
private List<Integer> removeAll(List<Integer> src, List<Integer> target) {
   LinkedList<Integer> result = new LinkedList<>(src); //大集合用linkedlist
   HashSet<Integer> targetHash = new HashSet<>(target); //小集合用hashset
   Iterator<Integer> iter = result.iterator(); //采用Iterator迭代器进行数据的操作

   while(iter.hasNext()){ 
      if(targetHash.contains(iter.next())){
         iter.remove();
      }
   }
   return result;
}
同样数量级list, 整个过程只需要几十毫秒,简直天壤之别。
回过头来,比较一下两种实现方式,为什么差距这个大。
1、外层循环
     一个是普通的for循环,一个迭代器遍历元素,二者相差不大
2、内层数据比较
     前者通过index方法把整个数组顺序遍历了一遍;
     后者调用HashSet的contains方法,实际上是调用HashMap的containKey方法,查找时是通过hash表查找,复杂度为O(1)。
接下来我们简单看一下hash表。
hash表是一种特殊的数据结构,它同数组、链表以及二叉排序树等相比较有很明显的区别,它能够快速定位到想要查找的记录,而不是与表中存在的记录的关键字进行比较来进行查找。这个源于Hash表设计的特殊性,它采用了函数映射的思想将记录的存储位置与记录的关键字关联起来,从而能够很快速地进行查找。可以简单理解为,以空间换时间,牺牲空间复杂度来换取时间复杂度。
hash表采用一个映射函数 f : key —> address 将关键字映射到该记录在表中的存储位置,从而在想要查找该记录时,可以直接根据关键字和映射关系计算出该记录在表中的存储位置,通常情况下,这种映射关系称作为hash函数,而通过hash函数和关键字计算出来的存储位置(注意这里的存储位置只是表中的存储位置,并不是实际的物理地址)称作为hash地址。
上面的图大家应该都很熟悉,hash表的一种实现方式,是由数组+链表组成的。元素放入hash表的位置通过hash(key)%len获得,也就是元素的key的哈希值对数组长度取模得到。
另外hash表大小的确定也很关键,如果hash表的空间远远大于最后实际存储的记录个数,则造成了很大的空间浪费,如果选取小了的话,则容易造成冲突。在实际情况中,一般需要根据最终记录存储个数和关键字的分布特点来确定Hash表的大小。还有一种情况时可能事先不知道最终需要存储的记录个数,则需要动态维护Hash表的容量,此时可能需要重新计算Hash地址。
当然,关于hash表要说的话太多,先简单到此吧~~~
本篇文章转载自点我达官方技术博客

按指定大小分割List的一种算法

按指定大小分割List的一种算法。直接贴代码了,至于应用场景留给大家自己去思考。

public static List<List<?>> splitList(List<?> list, int len) {
    if (list == null || list.size() == 0 || len < 1) {
        return null;
    }
    List<List<?>> result = new ArrayList<List<?>>();

    int size = list.size();
    int count = (size + len - 1) / len;

    for (int i = 0; i < count; i++) {
        List<?> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
        result.add(subList);
    }
    return result;
}

 

使用AOP加自定义注解输出日志

什么是AOP、切面编程?想必各位Java Coder早已不陌生,徐叔就不过多解释。直接上代码需要的小伙伴们可以参考。

一、自定义注解

/**
 * 打印日志注解<br>
 * 作者:徐承恩<br>
 * 日期:2016/12/7-17:11<br>
 */
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = ElementType.METHOD)
@Inherited
public @interface PrintLog {
}

二、添加AspectJ依赖并整合Spring

<!-- AOP注解编程组件 -->
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>1.8.9</version>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.8.9</version>
</dependency>

继续阅读