订单的超时取消很多系统采用定时任务,实际上达不到要求。我用的是延迟队列,但缺点是只实现了基于jvm的,分布式采用的是修改之前去查询订单状态,以及分布式锁获取的方式来控制,这样获得锁的,先去查订单是否已经取消掉,如果没有,就改成去掉。但这种方式虽然效果不错,我对这个半吊子的思路还是不满意的,后面有时间再优化一些,做成分布式的。
整个功能实际就是利用延迟队列的特性。延迟队列有个时间属性,一旦到达这个时间节点,会立即从队列中弹出任务,这时候,轮询守护线程就可以执行这个任务了。如果没有到达这个时间节点,无论轮询守护线程怎么轮询,都查不到对应的任务弹出。从jdk1.5开始,java就提供了Delayed接口,用于延迟队列。该接口是继承于Comparable<T>的。至于延迟队列里的任务是什么样子,则由自己定义,我为了便于多线程跑,使用了继承了Runnable的任务,所以定义成这个样子。
执行流程:
- 系统启动,初始化延迟队列。随即会启动守护线程,用于轮询延迟队列里的任务到期弹出。
- 将现有数据库中处于待支付状态的订单插入延迟队列。
- 一旦有新的订单加入,则放入延迟队列中。
- 一旦有任务到期,则守护线程会接到弹出的任务,并执行该任务。
- 该任务会从更新数据库中的订单状态。
- 一旦有用户取消订单/支付订单,则从延迟队列中查询到该订单的延迟任务,手动删除该任务。
这个是使用的spring boot的线程池的,所以需要配置一个线程池。
延迟队列代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
package com.xxxx.xxxx.config.taskThreadPool; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 延迟队列中放入的task,里面包含到时间之后要处理的任务,该任务类实现了Runnable,用于交给线程池管理 */ public class DelayCancelOrderTask<T extends Runnable> implements Delayed { /** * 到期时间 */ private final long time; /** * 任务对象 */ private final T processor; /** * 原子类 */ private static final AtomicLong atomic = new AtomicLong(0); private final long sequence; public DelayCancelOrderTask(long timeout, T processor){ this.time = System.nanoTime() + timeout; this.processor = processor; this.sequence = atomic.getAndIncrement(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { if (o == this){ return 0; } if (o instanceof DelayCancelOrderTask){ DelayCancelOrderTask<?> other = (DelayCancelOrderTask<?>)o; long diff = this.time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; }else if(this.sequence < other.sequence){ return -1; } else { return 0; } } long diffrent = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (diffrent == 0)?0:((diffrent < 0) ? -1 : 1); } public T getProcessor(){ return this.processor; } @Override public int hashCode(){ return processor.hashCode(); } @Override public boolean equals(Object object) { if (object != null) { return object.hashCode() == hashCode() ? true : false; } return false; } } |
延迟队列中放置的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package com.xxxx.xxxx.config.taskThreadPool; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import com.xxxx.xxxx.domain.vo.MonitorOrderVO; import com.xxxx.xxxx.enums.EnumWorkOrderSource; import com.xxxx.xxxx.utils.MessageProducer; import org.springframework.scheduling.annotation.Async; import com.xxxx.xxxx.domain.pojo.MonitorOrder; import com.xxxx.xxxx.enums.EnumB2BOrderStatus; import com.xxxx.xxxx.service.MonitorOrderService; import com.xxxx.xxxx.utils.BeanFactoryGetter; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 超时取消的时间到了,这里要执行对订单的具体取消操作 */ public class PayTimeoutCancelOrderProcessor implements Runnable { private String b2bTaskNo; public PayTimeoutCancelOrderProcessor(String b2bTaskNo){ this.b2bTaskNo = b2bTaskNo; } @Override @Async public void run(){ //因service无法注入,只能从bean工厂中拉取 MonitorOrderService monitorOrderService = (MonitorOrderService)BeanFactoryGetter.getBean("monitorOrderService"); MonitorOrder monitorOrder = new MonitorOrder(); monitorOrder.setB2bTaskNo(b2bTaskNo); monitorOrder.setCurrentStatus(EnumB2BOrderStatus.OVERTIME_PAYMENT.getKey()); monitorOrder.setCurrentStatusTime(LocalDateTime.now()); monitorOrderService.uptCurrentStatusByTaskNoForOverTimeCancel(monitorOrder); //推送直接用户订单消息。现在还没有推送系统正式上线,所以先暂时注释掉 } } |
延迟队列的管理类,有轮询守护线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
package com.xxxx.xxxx.config.taskThreadPool; import java.time.Duration; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 超时取消的管理类,管理延迟队列,放出一个线程轮询监控延迟队列;有一些方法用来往延迟队列中压入task,有方法用来从队列中remove task。还有初始化方法,开始执行轮询监控线程 */ @Component public class DelayCancelOrderTaskManager { private static final Logger logger = LoggerFactory.getLogger(DelayCancelOrderTaskManager.class); //延迟队列,一切都是围绕它来运转的 private DelayQueue<DelayCancelOrderTask<?>> delayQueue; //超时取消的管理类是个单例,因为系统启动要有方法往队列中插入延迟task,所以搞成饿汉模式 private static DelayCancelOrderTaskManager instance = new DelayCancelOrderTaskManager(); // 守护线程,用于轮询延时队列 private Thread daemonThread; //该方法初始管理类时调用,初始化延迟队列,同时初始化轮询线程 private DelayCancelOrderTaskManager(){ delayQueue = new DelayQueue<DelayCancelOrderTask<?>>(); this.init(); } public static DelayCancelOrderTaskManager getInstance(){ return instance; } //初始化轮询监控守护线程 public void init(){ //lambda表达式,->的意思其实是静态方法,初始化随即执行的。 daemonThread = new Thread(()-> { try{ System.out.println("daemonThread start"); execute(); }catch (Exception e){ logger.error("轮询线程出错",e); } }); daemonThread.setName("DelayQueueMonitorThread"); daemonThread.start(); } //初始化的时候开始执行 private void execute(){ //不断轮询 while(true){ //此处仅为打印日志方便 Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); logger.info("线程数--------------" + map.size()); logger.info(System.currentTimeMillis()+" 队列中的个数:"+delayQueue.size()); try{ //从队列中取出可以取出的task。延时队列有个特点,不到时间取不出来,所以能取出来的,都是到时间即将执行的。 DelayCancelOrderTask<?> delayCancelOrderTask = delayQueue.take(); //task不为空,则开始执行 if(delayCancelOrderTask != null){ //获取task里面的处理线程,该线程会丢到线程池中处理 Runnable payTimeoutCancelOrderProcessor = delayCancelOrderTask.getProcessor(); if(payTimeoutCancelOrderProcessor == null){ continue; } //线程执行 payTimeoutCancelOrderProcessor.run(); //执行完毕,从队列中删除task this.removeTask(delayCancelOrderTask); } }catch (Exception e){ logger.error("线程执行错误:",e); } } } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间为以秒为单位 */ public void putTaskInSeconds(Runnable task,long timeoutPeriod){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.SECONDS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间以为分钟为单位 */ public void putTaskInMinites(Runnable task,long timeoutPeriod){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.MINUTES); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间以小时为单位 */ public void putTaskInHours(Runnable task,long timeoutPeriod){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.HOURS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的超时时间为自定义的单位 */ public void putTaskInOwnDefine(Runnable task,long timeoutPeriod,TimeUnit unit){ long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,unit); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/11 18:15 * @Param * @Description 传入的时间为超时时间点 */ public void putTaskInTimeoutTime(Runnable task,LocalDateTime timeoutTime){ Duration duration = Duration.between(LocalDateTime.now(),timeoutTime); long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(),TimeUnit.NANOSECONDS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task); delayQueue.put(delayCancelOrderTask); } /** * @Author xiehao * @Date 2018/8/13 15:43 * @Param * @Description 从队列中删除某个task。一般在用户自己取消订单的时候执行 */ public boolean removeTask(DelayCancelOrderTask<? extends Runnable> task){ return delayQueue.remove(task); } /** * @Author xiehao * @Date 2018/8/13 15:44 * @Param * @Description 判断队列中是否含有某个task */ public boolean contains(DelayCancelOrderTask<? extends Runnable> task){ return delayQueue.contains(task); } //获取队列个数。这个方法专门给打印日志核对数据用的。一般用不着它 public Integer getDelayQueueSize(){ System.out.println("队列中的个数:"+delayQueue.size()); return delayQueue.size(); } } |
在系统初始化的时候,需要将数据库中已经处于等待超时状态的订单打入延迟队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com.xxxx.xxxx.config.runner; import com.xxxx.xxxx.config.taskThreadPool.DelayCancelOrderTaskManager; import com.xxxx.xxxx.config.taskThreadPool.PayTimeoutCancelOrderProcessor; import com.xxxx.xxxx.domain.pojo.MonitorOrder; import com.xxxx.xxxx.service.MonitorOrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.List; /** * @Author: xiehao * @Date: 2018-08-11 * @Description 系统初始化时将处于待支付状态的单据丢到延时队列中,等待执行超时任务 */ @Component public class OrderPayTimeoutQueueRunner implements CommandLineRunner { @Autowired private MonitorOrderService monitorOrderService; @Override public void run(String... strings) throws Exception { DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance(); List<MonitorOrder> monitorOrderList = monitorOrderService.getPayNotTimeoutOrderList(); for(MonitorOrder monitorOrder : monitorOrderList){ PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(monitorOrder.getB2bTaskNo()); delayCancelOrderTaskManager.putTaskInTimeoutTime(processor,monitorOrder.getOrderPayTimeout()); } } } |
如果用户取消订单/中间执行支付操作后,则从延迟队列中去掉任务:
1 2 3 4 5 6 7 8 9 |
//取消预约单,则删除延时队列中的超时等待信号 xiehao start MonitorOrder targetOrder = monitorOrderMapper.getMonitorOrderByTaskNo(taskNum); DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance(); PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(targetOrder.getB2bTaskNo()); Duration duration = Duration.between(LocalDateTime.now(), targetOrder.getOrderPayTimeout()); long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(), TimeUnit.NANOSECONDS); DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout, processor); delayCancelOrderTaskManager.removeTask(delayCancelOrderTask); //取消预约单,则删除延时队列中的超时等待信号 xiehao end |
如果用户中间下单,则加入延迟队列:
1 2 3 4 |
//加入延时队列,等待超时取消 xiehao DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance(); PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(order.getB2bTaskNo()); delayCancelOrderTaskManager.putTaskInTimeoutTime(processor, order.getOrderPayTimeout()); |
工具类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com.xxxx.xxxx.utils; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.stereotype.Component; /** * @Author: xiehao * @Date: 2018-08-04 */ @Component public class BeanFactoryGetter implements BeanFactoryAware { //Bean工厂必须是static类型,否则系统启动的时候将无法写入factory private static BeanFactory factory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } public static Object getBean(String beanName){ if(StringUtils.isEmpty(beanName)){ return null; } Object t= factory.getBean(beanName); return t; } } |
转载请注明:思码老徐 » 基于延迟队列打造精准的订单超时关闭