基于延迟队列打造精准的订单超时关闭

订单的超时取消很多系统采用定时任务,实际上达不到要求。我用的是延迟队列,但缺点是只实现了基于jvm的,分布式采用的是修改之前去查询订单状态,以及分布式锁获取的方式来控制,这样获得锁的,先去查订单是否已经取消掉,如果没有,就改成去掉。但这种方式虽然效果不错,我对这个半吊子的思路还是不满意的,后面有时间再优化一些,做成分布式的。

整个功能实际就是利用延迟队列的特性。延迟队列有个时间属性,一旦到达这个时间节点,会立即从队列中弹出任务,这时候,轮询守护线程就可以执行这个任务了。如果没有到达这个时间节点,无论轮询守护线程怎么轮询,都查不到对应的任务弹出。从jdk1.5开始,java就提供了Delayed接口,用于延迟队列。该接口是继承于Comparable<T>的。至于延迟队列里的任务是什么样子,则由自己定义,我为了便于多线程跑,使用了继承了Runnable的任务,所以定义成这个样子。

执行流程:

  1. 系统启动,初始化延迟队列。随即会启动守护线程,用于轮询延迟队列里的任务到期弹出。
  2. 将现有数据库中处于待支付状态的订单插入延迟队列。
  3. 一旦有新的订单加入,则放入延迟队列中。
  4. 一旦有任务到期,则守护线程会接到弹出的任务,并执行该任务。
  5. 该任务会从更新数据库中的订单状态。
  6. 一旦有用户取消订单/支付订单,则从延迟队列中查询到该订单的延迟任务,手动删除该任务。

这个是使用的spring boot的线程池的,所以需要配置一个线程池。

延迟队列代码:

package com.xxxx.xxxx.config.taskThreadPool;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 延迟队列中放入的task,里面包含到时间之后要处理的任务,该任务类实现了Runnable,用于交给线程池管理
 */
public class DelayCancelOrderTask<T extends Runnable> implements Delayed {

   /**
    * 到期时间
    */
   private final long            time;

   /**
    * 任务对象
    */
   private final T processor;
   /**
    * 原子类
    */
   private static final AtomicLong atomic = new AtomicLong(0);

   private final long sequence;

   public DelayCancelOrderTask(long timeout, T processor){
      this.time = System.nanoTime() + timeout;
      this.processor = processor;
      this.sequence = atomic.getAndIncrement();
   }
   @Override
   public long getDelay(TimeUnit unit) {
      return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
   }

   @Override
   public int compareTo(Delayed o) {
      if (o == this){
         return 0;
      }
      if (o instanceof DelayCancelOrderTask){
         DelayCancelOrderTask<?> other = (DelayCancelOrderTask<?>)o;
         long diff = this.time - other.time;
         if (diff > 0) {
            return 1;
         } else if (diff < 0) {
            return -1;
         }else if(this.sequence < other.sequence){
            return -1;
         } else {
            return 0;
         }
      }
      long diffrent = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
      return (diffrent == 0)?0:((diffrent < 0) ? -1 : 1);
   }
   public T getProcessor(){
      return  this.processor;
   }
   @Override
   public int hashCode(){
      return processor.hashCode();
   }
   @Override
   public boolean equals(Object object)
   {
      if (object != null)
      {
         return object.hashCode() == hashCode() ? true : false;
      }
      return false;
   }

}

延迟队列中放置的任务:

package com.xxxx.xxxx.config.taskThreadPool;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

import com.xxxx.xxxx.domain.vo.MonitorOrderVO;
import com.xxxx.xxxx.enums.EnumWorkOrderSource;
import com.xxxx.xxxx.utils.MessageProducer;
import org.springframework.scheduling.annotation.Async;

import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.enums.EnumB2BOrderStatus;
import com.xxxx.xxxx.service.MonitorOrderService;
import com.xxxx.xxxx.utils.BeanFactoryGetter;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 超时取消的时间到了,这里要执行对订单的具体取消操作
 */
public class PayTimeoutCancelOrderProcessor implements Runnable {
   private String b2bTaskNo;
   public PayTimeoutCancelOrderProcessor(String b2bTaskNo){
      this.b2bTaskNo = b2bTaskNo;
   }
   @Override
   @Async
   public void run(){
      //因service无法注入,只能从bean工厂中拉取
      MonitorOrderService monitorOrderService = (MonitorOrderService)BeanFactoryGetter.getBean("monitorOrderService");
      MonitorOrder monitorOrder = new MonitorOrder();
      monitorOrder.setB2bTaskNo(b2bTaskNo);
      monitorOrder.setCurrentStatus(EnumB2BOrderStatus.OVERTIME_PAYMENT.getKey());
      monitorOrder.setCurrentStatusTime(LocalDateTime.now());
      monitorOrderService.uptCurrentStatusByTaskNoForOverTimeCancel(monitorOrder);
      //推送直接用户订单消息。现在还没有推送系统正式上线,所以先暂时注释掉
   }
}

延迟队列的管理类,有轮询守护线程:

package com.xxxx.xxxx.config.taskThreadPool;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 超时取消的管理类,管理延迟队列,放出一个线程轮询监控延迟队列;有一些方法用来往延迟队列中压入task,有方法用来从队列中remove task。还有初始化方法,开始执行轮询监控线程
 */
@Component
public class DelayCancelOrderTaskManager {
   private static final Logger logger = LoggerFactory.getLogger(DelayCancelOrderTaskManager.class);
   //延迟队列,一切都是围绕它来运转的
   private DelayQueue<DelayCancelOrderTask<?>> delayQueue;
   //超时取消的管理类是个单例,因为系统启动要有方法往队列中插入延迟task,所以搞成饿汉模式
   private static DelayCancelOrderTaskManager instance = new DelayCancelOrderTaskManager();
   // 守护线程,用于轮询延时队列
   private Thread               daemonThread;
   //该方法初始管理类时调用,初始化延迟队列,同时初始化轮询线程
   private DelayCancelOrderTaskManager(){
      delayQueue = new DelayQueue<DelayCancelOrderTask<?>>();
      this.init();
   }
   public static DelayCancelOrderTaskManager getInstance(){
      return instance;
   }
   //初始化轮询监控守护线程
   public void init(){
      //lambda表达式,->的意思其实是静态方法,初始化随即执行的。
      daemonThread = new Thread(()-> {
         try{
            System.out.println("daemonThread start");
            execute();
         }catch (Exception e){
            logger.error("轮询线程出错",e);
         }
      });
      daemonThread.setName("DelayQueueMonitorThread");
      daemonThread.start();
   }
   //初始化的时候开始执行
   private void execute(){
      //不断轮询
      while(true){
         //此处仅为打印日志方便
         Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
         logger.info("线程数--------------" + map.size());
         logger.info(System.currentTimeMillis()+" 队列中的个数:"+delayQueue.size());
         try{
            //从队列中取出可以取出的task。延时队列有个特点,不到时间取不出来,所以能取出来的,都是到时间即将执行的。
            DelayCancelOrderTask<?> delayCancelOrderTask = delayQueue.take();
            //task不为空,则开始执行
            if(delayCancelOrderTask != null){
               //获取task里面的处理线程,该线程会丢到线程池中处理
               Runnable payTimeoutCancelOrderProcessor = delayCancelOrderTask.getProcessor();
               if(payTimeoutCancelOrderProcessor == null){
                  continue;
               }
               //线程执行
               payTimeoutCancelOrderProcessor.run();
               //执行完毕,从队列中删除task
               this.removeTask(delayCancelOrderTask);
            }
         }catch (Exception e){
            logger.error("线程执行错误:",e);
         }
      }
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间为以秒为单位
    */
   public void putTaskInSeconds(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.SECONDS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间以为分钟为单位
    */
   public void putTaskInMinites(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.MINUTES);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }
   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间以小时为单位
    */
   public void putTaskInHours(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.HOURS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }
   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间为自定义的单位
    */
   public void putTaskInOwnDefine(Runnable task,long timeoutPeriod,TimeUnit unit){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,unit);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的时间为超时时间点
    */
   public void putTaskInTimeoutTime(Runnable task,LocalDateTime timeoutTime){
      Duration duration = Duration.between(LocalDateTime.now(),timeoutTime);
      long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(),TimeUnit.NANOSECONDS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/13 15:43
    * @Param
    * @Description 从队列中删除某个task。一般在用户自己取消订单的时候执行
    */
   public boolean removeTask(DelayCancelOrderTask<? extends Runnable> task){
      return delayQueue.remove(task);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/13 15:44
    * @Param
    * @Description 判断队列中是否含有某个task
    */
   public boolean contains(DelayCancelOrderTask<? extends Runnable> task){
      return delayQueue.contains(task);
   }
   //获取队列个数。这个方法专门给打印日志核对数据用的。一般用不着它
   public Integer getDelayQueueSize(){
      System.out.println("队列中的个数:"+delayQueue.size());
      return delayQueue.size();
   }
}

在系统初始化的时候,需要将数据库中已经处于等待超时状态的订单打入延迟队列:

package com.xxxx.xxxx.config.runner;

import com.xxxx.xxxx.config.taskThreadPool.DelayCancelOrderTaskManager;
import com.xxxx.xxxx.config.taskThreadPool.PayTimeoutCancelOrderProcessor;
import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.service.MonitorOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 系统初始化时将处于待支付状态的单据丢到延时队列中,等待执行超时任务
 */

@Component
public class OrderPayTimeoutQueueRunner  implements CommandLineRunner {
   @Autowired
   private MonitorOrderService monitorOrderService;
   @Override
   public void run(String... strings) throws Exception {
      DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
      List<MonitorOrder> monitorOrderList = monitorOrderService.getPayNotTimeoutOrderList();
      for(MonitorOrder monitorOrder : monitorOrderList){
         PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(monitorOrder.getB2bTaskNo());
         delayCancelOrderTaskManager.putTaskInTimeoutTime(processor,monitorOrder.getOrderPayTimeout());
      }
   }
}

如果用户取消订单/中间执行支付操作后,则从延迟队列中去掉任务:

//取消预约单,则删除延时队列中的超时等待信号 xiehao start
MonitorOrder targetOrder = monitorOrderMapper.getMonitorOrderByTaskNo(taskNum);
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(targetOrder.getB2bTaskNo());
Duration duration = Duration.between(LocalDateTime.now(), targetOrder.getOrderPayTimeout());
long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout, processor);
delayCancelOrderTaskManager.removeTask(delayCancelOrderTask);
//取消预约单,则删除延时队列中的超时等待信号 xiehao end

如果用户中间下单,则加入延迟队列:

//加入延时队列,等待超时取消 xiehao
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(order.getB2bTaskNo());
delayCancelOrderTaskManager.putTaskInTimeoutTime(processor, order.getOrderPayTimeout());

工具类:

package com.xxxx.xxxx.utils;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

/**
 * @Author: xiehao
 * @Date: 2018-08-04
 */

@Component
public class BeanFactoryGetter implements BeanFactoryAware {

   //Bean工厂必须是static类型,否则系统启动的时候将无法写入factory
   private static BeanFactory factory;

   @Override
   public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
      factory = beanFactory;
   }

   public static Object getBean(String beanName){
      if(StringUtils.isEmpty(beanName)){
         return null;
      }
      Object t= factory.getBean(beanName);
      return t;
   }
}