订单的超时取消很多系统采用定时任务,实际上达不到要求。我用的是延迟队列,但缺点是只实现了基于jvm的,分布式采用的是修改之前去查询订单状态,以及分布式锁获取的方式来控制,这样获得锁的,先去查订单是否已经取消掉,如果没有,就改成去掉。但这种方式虽然效果不错,我对这个半吊子的思路还是不满意的,后面有时间再优化一些,做成分布式的。
整个功能实际就是利用延迟队列的特性。延迟队列有个时间属性,一旦到达这个时间节点,会立即从队列中弹出任务,这时候,轮询守护线程就可以执行这个任务了。如果没有到达这个时间节点,无论轮询守护线程怎么轮询,都查不到对应的任务弹出。从jdk1.5开始,java就提供了Delayed接口,用于延迟队列。该接口是继承于Comparable<T>的。至于延迟队列里的任务是什么样子,则由自己定义,我为了便于多线程跑,使用了继承了Runnable的任务,所以定义成这个样子。
执行流程:
- 系统启动,初始化延迟队列。随即会启动守护线程,用于轮询延迟队列里的任务到期弹出。
- 将现有数据库中处于待支付状态的订单插入延迟队列。
- 一旦有新的订单加入,则放入延迟队列中。
- 一旦有任务到期,则守护线程会接到弹出的任务,并执行该任务。
- 该任务会从更新数据库中的订单状态。
- 一旦有用户取消订单/支付订单,则从延迟队列中查询到该订单的延迟任务,手动删除该任务。
这个是使用的spring boot的线程池的,所以需要配置一个线程池。
延迟队列代码:
package com.xxxx.xxxx.config.taskThreadPool; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 延迟队列中放入的task,里面包含到时间之后要处理的任务,该任务类实现了Runnable,用于交给线程池管理 */ public class DelayCancelOrderTask<T extends Runnable> implements Delayed { /** * 到期时间 */ private final long time; /** * 任务对象 */ private final T processor; /** * 原子类 */ private static final AtomicLong atomic = new AtomicLong(0); private final long sequence; public DelayCancelOrderTask(long timeout, T processor){ this.time = System.nanoTime() + timeout; this.processor = processor; this.sequence = atomic.getAndIncrement(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { if (o == this){ return 0; } if (o instanceof DelayCancelOrderTask){ DelayCancelOrderTask<?> other = (DelayCancelOrderTask<?>)o; long diff = this.time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; }else if(this.sequence < other.sequence){ return -1; } else { return 0; } } long diffrent = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (diffrent == 0)?0:((diffrent < 0) ? -1 : 1); } public T getProcessor(){ return this.processor; } @Override public int hashCode(){ return processor.hashCode(); } @Override public boolean equals(Object object) { if (object != null) { return object.hashCode() == hashCode() ? true : false; } return false; } }
延迟队列中放置的任务:
package com.xxxx.xxxx.config.taskThreadPool; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import com.xxxx.xxxx.domain.vo.MonitorOrderVO; import com.xxxx.xxxx.enums.EnumWorkOrderSource; import com.xxxx.xxxx.utils.MessageProducer; import org.springframework.scheduling.annotation.Async; import com.xxxx.xxxx.domain.pojo.MonitorOrder; import com.xxxx.xxxx.enums.EnumB2BOrderStatus; import com.xxxx.xxxx.service.MonitorOrderService; import com.xxxx.xxxx.utils.BeanFactoryGetter; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 超时取消的时间到了,这里要执行对订单的具体取消操作 */ public class PayTimeoutCancelOrderProcessor implements Runnable { private String b2bTaskNo; public PayTimeoutCancelOrderProcessor(String b2bTaskNo){ this.b2bTaskNo = b2bTaskNo; } @Override @Async public void run(){ //因service无法注入,只能从bean工厂中拉取 MonitorOrderService monitorOrderService = (MonitorOrderService)BeanFactoryGetter.getBean("monitorOrderService"); MonitorOrder monitorOrder = new MonitorOrder(); monitorOrder.setB2bTaskNo(b2bTaskNo); monitorOrder.setCurrentStatus(EnumB2BOrderStatus.OVERTIME_PAYMENT.getKey()); monitorOrder.setCurrentStatusTime(LocalDateTime.now()); monitorOrderService.uptCurrentStatusByTaskNoForOverTimeCancel(monitorOrder); //推送直接用户订单消息。现在还没有推送系统正式上线,所以先暂时注释掉 } }
延迟队列的管理类,有轮询守护线程:
package com.xxxx.xxxx.config.taskThreadPool; import java.time.Duration; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 超时取消的管理类,管理延迟队列,放出一个线程轮询监控延迟队列;有一些方法用来往延迟队列中压入task,有方法用来从队列中remove task。还有初始化方法,开始执行轮询监控线程 */ @Component public class DelayCancelOrderTaskManager { private static final Logger logger = LoggerFactory.getLogger(DelayCancelOrderTaskManager.class); //延迟队列,一切都是围绕它来运转的 private DelayQueue<DelayCancelOrderTask<?>> delayQueue; //超时取消的管理类是个单例,因为系统启动要有方法往队列中插入延迟task,所以搞成饿汉模式 private static DelayCancelOrderTaskManager instance = new DelayCancelOrderTaskManager(); // 守护线程,用于轮询延时队列 private Thread daemonThread; //该方法初始管理类时调用,初始化延迟队列,同时初始化轮询线程 private DelayCancelOrderTaskManager(){ delayQueue = new DelayQueue<DelayCancelOrderTask<?>>(); this.init(); } public static DelayCancelOrderTaskManager getInstance(){ return instance; } //初始化轮询监控守护线程 public void init(){ //lambda表达式,->的意思其实是静态方法,初始化随即执行的。 daemonThread = new Thread(()-> { try{ System.out.println("daemonThread start"); execute(); }catch (Exception e){ logger.error("轮询线程出错",e); } }); daemonThread.setName("DelayQueueMonitorThread"); daemonThread.start(); } //初始化的时候开始执行 private void execute(){ //不断轮询 while(true){ //此处仅为打印日志方便 Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); logger.info("线程数--------------" + map.size()); logger.info(System.currentTimeMillis()+" 队列中的个数:"+delayQueue.size()); try{ //从队列中取出可以取出的task。延时队列有个特点,不到时间取不出来,所以能取出来的,都是到时间即将执行的。 DelayCancelOrderTask<?> delayCancelOrderTask = delayQueue.take(); //task不为空,则开始执行 if(delayCancelOrderTask != null){ //获取task里面的处理线程,该线程会丢到线程池中处理 Runnable payTimeoutCancelOrderProcessor = delayCancelOrderTask.getProcessor(); if(payTimeoutCancelOrderProcessor == null){ continue; } //线程执行 payTimeoutCancelOrderProcessor.run(); //执行完毕,从队列中删除task this.removeTask(delayCancelOrderTask); } }catch (Exception e){ logger.error("线程执行错误:",e); } } } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间为以秒为单位 */ public void putTaskInSeconds(Runnable task,long timeoutPeriod){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.SECONDS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间以为分钟为单位 */ public void putTaskInMinites(Runnable task,long timeoutPeriod){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.MINUTES); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间以小时为单位 */ public void putTaskInHours(Runnable task,long timeoutPeriod){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.HOURS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间为自定义的单位 */ public void putTaskInOwnDefine(Runnable task,long timeoutPeriod,TimeUnit unit){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,unit); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的时间为超时时间点 */ public void putTaskInTimeoutTime(Runnable task,LocalDateTime timeoutTime){ Duration duration = Duration.between(LocalDateTime.now(),timeoutTime); long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(),TimeUnit.NANOSECONDS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/13 15:43 * @Param * @Description 从队列中删除某个task。一般在用户自己取消订单的时候执行 */ public boolean removeTask(DelayCancelOrderTask<? extends Runnable> task){ return delayQueue.remove(task); } /** * @Author xiehao * @Date 2018/8/13 15:44 * @Param * @Description 判断队列中是否含有某个task */ public boolean contains(DelayCancelOrderTask<? extends Runnable> task){ return delayQueue.contains(task); } //获取队列个数。这个方法专门给打印日志核对数据用的。一般用不着它 public Integer getDelayQueueSize(){ System.out.println("队列中的个数:"+delayQueue.size()); return delayQueue.size(); } }
在系统初始化的时候,需要将数据库中已经处于等待超时状态的订单打入延迟队列:
package com.xxxx.xxxx.config.runner; import com.xxxx.xxxx.config.taskThreadPool.DelayCancelOrderTaskManager; import com.xxxx.xxxx.config.taskThreadPool.PayTimeoutCancelOrderProcessor; import com.xxxx.xxxx.domain.pojo.MonitorOrder; import com.xxxx.xxxx.service.MonitorOrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.List; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 系统初始化时将处于待支付状态的单据丢到延时队列中,等待执行超时任务 */ @Component public class OrderPayTimeoutQueueRunner implements CommandLineRunner { @Autowired private MonitorOrderService monitorOrderService; @Override public void run(String... strings) throws Exception { DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance(); List<MonitorOrder> monitorOrderList = monitorOrderService.getPayNotTimeoutOrderList(); for(MonitorOrder monitorOrder : monitorOrderList){ PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(monitorOrder.getB2bTaskNo()); delayCancelOrderTaskManager.putTaskInTimeoutTime(processor,monitorOrder.getOrderPayTimeout()); } } }
如果用户取消订单/中间执行支付操作后,则从延迟队列中去掉任务:
//取消预约单,则删除延时队列中的超时等待信号 xiehao start MonitorOrder targetOrder = monitorOrderMapper.getMonitorOrderByTaskNo(taskNum); DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance(); PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(targetOrder.getB2bTaskNo()); Duration duration = Duration.between(LocalDateTime.now(), targetOrder.getOrderPayTimeout()); long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(), TimeUnit.NANOSECONDS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout, processor); delayCancelOrderTaskManager.removeTask(delayCancelOrderTask); //取消预约单,则删除延时队列中的超时等待信号 xiehao end
如果用户中间下单,则加入延迟队列:
//加入延时队列,等待超时取消 xiehao DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance(); PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(order.getB2bTaskNo()); delayCancelOrderTaskManager.putTaskInTimeoutTime(processor, order.getOrderPayTimeout());
工具类:
package com.xxxx.xxxx.utils; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.stereotype.Component; /** * @Author: xiehao * @Date: 2018-08-04 */ @Component public class BeanFactoryGetter implements BeanFactoryAware { //Bean工厂必须是static类型,否则系统启动的时候将无法写入factory private static BeanFactory factory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } public static Object getBean(String beanName){ if(StringUtils.isEmpty(beanName)){ return null; } Object t= factory.getBean(beanName); return t; } }